001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.HIGH_QOS;
021import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
022import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
023import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
024
025import edu.umd.cs.findbugs.annotations.Nullable;
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.Collections;
030import java.util.EnumSet;
031import java.util.HashMap;
032import java.util.List;
033import java.util.Map;
034import java.util.Optional;
035import java.util.Set;
036import java.util.concurrent.CompletableFuture;
037import java.util.concurrent.ConcurrentHashMap;
038import java.util.concurrent.ConcurrentLinkedQueue;
039import java.util.concurrent.TimeUnit;
040import java.util.concurrent.atomic.AtomicReference;
041import java.util.function.BiConsumer;
042import java.util.function.Consumer;
043import java.util.function.Function;
044import java.util.function.Supplier;
045import java.util.regex.Pattern;
046import java.util.stream.Collectors;
047import java.util.stream.Stream;
048import org.apache.hadoop.conf.Configuration;
049import org.apache.hadoop.hbase.CacheEvictionStats;
050import org.apache.hadoop.hbase.CacheEvictionStatsAggregator;
051import org.apache.hadoop.hbase.CatalogFamilyFormat;
052import org.apache.hadoop.hbase.ClientMetaTableAccessor;
053import org.apache.hadoop.hbase.ClusterMetrics;
054import org.apache.hadoop.hbase.ClusterMetrics.Option;
055import org.apache.hadoop.hbase.ClusterMetricsBuilder;
056import org.apache.hadoop.hbase.HConstants;
057import org.apache.hadoop.hbase.HRegionLocation;
058import org.apache.hadoop.hbase.NamespaceDescriptor;
059import org.apache.hadoop.hbase.RegionLocations;
060import org.apache.hadoop.hbase.RegionMetrics;
061import org.apache.hadoop.hbase.RegionMetricsBuilder;
062import org.apache.hadoop.hbase.ServerName;
063import org.apache.hadoop.hbase.TableExistsException;
064import org.apache.hadoop.hbase.TableName;
065import org.apache.hadoop.hbase.TableNotDisabledException;
066import org.apache.hadoop.hbase.TableNotEnabledException;
067import org.apache.hadoop.hbase.TableNotFoundException;
068import org.apache.hadoop.hbase.UnknownRegionException;
069import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder;
070import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
071import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder;
072import org.apache.hadoop.hbase.client.Scan.ReadType;
073import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
074import org.apache.hadoop.hbase.client.replication.TableCFs;
075import org.apache.hadoop.hbase.client.security.SecurityCapability;
076import org.apache.hadoop.hbase.exceptions.DeserializationException;
077import org.apache.hadoop.hbase.ipc.HBaseRpcController;
078import org.apache.hadoop.hbase.net.Address;
079import org.apache.hadoop.hbase.quotas.QuotaFilter;
080import org.apache.hadoop.hbase.quotas.QuotaSettings;
081import org.apache.hadoop.hbase.quotas.QuotaTableUtil;
082import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
083import org.apache.hadoop.hbase.replication.ReplicationException;
084import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
085import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
086import org.apache.hadoop.hbase.replication.SyncReplicationState;
087import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
088import org.apache.hadoop.hbase.security.access.GetUserPermissionsRequest;
089import org.apache.hadoop.hbase.security.access.Permission;
090import org.apache.hadoop.hbase.security.access.ShadedAccessControlUtil;
091import org.apache.hadoop.hbase.security.access.UserPermission;
092import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
093import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
094import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
095import org.apache.hadoop.hbase.util.Bytes;
096import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
097import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
098import org.apache.hadoop.hbase.util.Pair;
099import org.apache.yetus.audience.InterfaceAudience;
100import org.slf4j.Logger;
101import org.slf4j.LoggerFactory;
102
103import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
104import org.apache.hbase.thirdparty.com.google.protobuf.Message;
105import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
106import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
107import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
108import org.apache.hbase.thirdparty.io.netty.util.Timeout;
109import org.apache.hbase.thirdparty.io.netty.util.TimerTask;
110import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
111
112import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
113import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
114import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos;
115import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse;
116import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantRequest;
117import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantResponse;
118import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsRequest;
119import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsResponse;
120import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeRequest;
121import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeResponse;
122import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
123import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
124import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
125import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
126import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
127import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
128import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
129import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
130import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
131import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
132import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
133import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
134import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
135import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
136import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
137import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
138import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
139import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
140import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
141import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
142import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
143import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
144import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
145import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
146import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
147import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
148import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
149import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
150import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
151import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest;
152import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse;
153import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
154import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse;
155import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest;
156import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse;
157import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest;
158import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersResponse;
159import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest;
160import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse;
161import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
162import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse;
163import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest;
164import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse;
165import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest;
166import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse;
167import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
168import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
169import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
170import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
171import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest;
172import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
173import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
174import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse;
175import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
176import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
177import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
178import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse;
179import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest;
180import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
181import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest;
182import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreResponse;
183import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
184import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
185import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
186import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
187import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest;
188import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse;
189import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
190import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
191import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresResponse;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest;
198import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse;
199import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
200import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
201import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest;
202import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse;
203import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledRequest;
204import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledResponse;
205import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotCleanupEnabledResponse;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesResponse;
224import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByStateRequest;
227import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByStateResponse;
228import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
229import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse;
230import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByStateRequest;
231import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByStateResponse;
232import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
233import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest;
234import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse;
235import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
236import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest;
237import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse;
238import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest;
239import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse;
240import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerRequest;
241import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerResponse;
242import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
243import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
244import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest;
245import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse;
246import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerRequest;
247import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerResponse;
248import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest;
249import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse;
250import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest;
251import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse;
252import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest;
253import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse;
254import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest;
255import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse;
256import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
257import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
258import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest;
259import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse;
260import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest;
261import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreResponse;
262import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
263import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse;
264import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
265import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
266import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest;
267import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningResponse;
268import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
269import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
270import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
271import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
272import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSnapshotCleanupResponse;
273import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest;
274import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse;
275import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest;
276import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse;
277import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest;
278import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse;
279import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest;
280import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse;
281import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest;
282import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse;
283import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest;
284import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse;
285import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest;
286import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse;
287import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
288import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
289import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
290import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse;
291import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
292import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest;
293import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
294import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
295import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse;
296import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes;
297import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
298import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
299import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupRequest;
300import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupResponse;
301import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupRequest;
302import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupResponse;
303import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetConfiguredNamespacesAndTablesInRSGroupRequest;
304import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetConfiguredNamespacesAndTablesInRSGroupResponse;
305import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerRequest;
306import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerResponse;
307import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableRequest;
308import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableResponse;
309import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoRequest;
310import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoResponse;
311import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosRequest;
312import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosResponse;
313import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListTablesInRSGroupRequest;
314import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListTablesInRSGroupResponse;
315import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersRequest;
316import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersResponse;
317import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupRequest;
318import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupResponse;
319import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersRequest;
320import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersResponse;
321import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupRequest;
322import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupResponse;
323import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.UpdateRSGroupConfigRequest;
324import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.UpdateRSGroupConfigResponse;
325import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
326import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
327import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
328import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
329import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest;
330import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
331import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
332import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
333import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresRequest;
334import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresResponse;
335import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest;
336import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse;
337import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledRequest;
338import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledResponse;
339import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
340import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
341import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
342import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
343import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchRequest;
344import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchResponse;
345import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest;
346import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse;
347import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
348import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
349import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
350
351/**
352 * The implementation of AsyncAdmin.
353 * <p>
354 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will
355 * be finished inside the rpc framework thread, which means that the callbacks registered to the
356 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use
357 * this class should not try to do time consuming tasks in the callbacks.
358 * @since 2.0.0
359 * @see AsyncHBaseAdmin
360 * @see AsyncConnection#getAdmin()
361 * @see AsyncConnection#getAdminBuilder()
362 */
363@InterfaceAudience.Private
364class RawAsyncHBaseAdmin implements AsyncAdmin {
365
366  public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc";
367
368  private static final Logger LOG = LoggerFactory.getLogger(AsyncHBaseAdmin.class);
369
370  private final AsyncConnectionImpl connection;
371
372  private final HashedWheelTimer retryTimer;
373
374  private final AsyncTable<AdvancedScanResultConsumer> metaTable;
375
376  private final long rpcTimeoutNs;
377
378  private final long operationTimeoutNs;
379
380  private final long pauseNs;
381
382  private final long pauseNsForServerOverloaded;
383
384  private final int maxAttempts;
385
386  private final int startLogErrorsCnt;
387
388  private final NonceGenerator ng;
389
390  RawAsyncHBaseAdmin(AsyncConnectionImpl connection, HashedWheelTimer retryTimer,
391    AsyncAdminBuilderBase builder) {
392    this.connection = connection;
393    this.retryTimer = retryTimer;
394    this.metaTable = connection.getTable(META_TABLE_NAME);
395    this.rpcTimeoutNs = builder.rpcTimeoutNs;
396    this.operationTimeoutNs = builder.operationTimeoutNs;
397    this.pauseNs = builder.pauseNs;
398    if (builder.pauseNsForServerOverloaded < builder.pauseNs) {
399      LOG.warn(
400        "Configured value of pauseNsForServerOverloaded is {} ms, which is less than"
401          + " the normal pause value {} ms, use the greater one instead",
402        TimeUnit.NANOSECONDS.toMillis(builder.pauseNsForServerOverloaded),
403        TimeUnit.NANOSECONDS.toMillis(builder.pauseNs));
404      this.pauseNsForServerOverloaded = builder.pauseNs;
405    } else {
406      this.pauseNsForServerOverloaded = builder.pauseNsForServerOverloaded;
407    }
408    this.maxAttempts = builder.maxAttempts;
409    this.startLogErrorsCnt = builder.startLogErrorsCnt;
410    this.ng = connection.getNonceGenerator();
411  }
412
413  <T> MasterRequestCallerBuilder<T> newMasterCaller() {
414    return this.connection.callerFactory.<T> masterRequest()
415      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
416      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
417      .pause(pauseNs, TimeUnit.NANOSECONDS)
418      .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
419      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
420  }
421
422  private <T> AdminRequestCallerBuilder<T> newAdminCaller() {
423    return this.connection.callerFactory.<T> adminRequest()
424      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
425      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
426      .pause(pauseNs, TimeUnit.NANOSECONDS)
427      .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
428      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
429  }
430
431  @FunctionalInterface
432  private interface MasterRpcCall<RESP, REQ> {
433    void call(MasterService.Interface stub, HBaseRpcController controller, REQ req,
434      RpcCallback<RESP> done);
435  }
436
437  @FunctionalInterface
438  private interface AdminRpcCall<RESP, REQ> {
439    void call(AdminService.Interface stub, HBaseRpcController controller, REQ req,
440      RpcCallback<RESP> done);
441  }
442
443  @FunctionalInterface
444  private interface Converter<D, S> {
445    D convert(S src) throws IOException;
446  }
447
448  private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller,
449    MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall,
450    Converter<RESP, PRESP> respConverter) {
451    CompletableFuture<RESP> future = new CompletableFuture<>();
452    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
453
454      @Override
455      public void run(PRESP resp) {
456        if (controller.failed()) {
457          future.completeExceptionally(controller.getFailed());
458        } else {
459          try {
460            future.complete(respConverter.convert(resp));
461          } catch (IOException e) {
462            future.completeExceptionally(e);
463          }
464        }
465      }
466    });
467    return future;
468  }
469
470  private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller,
471    AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall,
472    Converter<RESP, PRESP> respConverter) {
473    CompletableFuture<RESP> future = new CompletableFuture<>();
474    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
475
476      @Override
477      public void run(PRESP resp) {
478        if (controller.failed()) {
479          future.completeExceptionally(controller.getFailed());
480        } else {
481          try {
482            future.complete(respConverter.convert(resp));
483          } catch (IOException e) {
484            future.completeExceptionally(e);
485          }
486        }
487      }
488    });
489    return future;
490  }
491
492  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq,
493    MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
494    ProcedureBiConsumer consumer) {
495    return procedureCall(b -> {
496    }, preq, rpcCall, respConverter, consumer);
497  }
498
499  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(TableName tableName, PREQ preq,
500    MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
501    ProcedureBiConsumer consumer) {
502    return procedureCall(b -> b.priority(tableName), preq, rpcCall, respConverter, consumer);
503  }
504
505  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(
506    Consumer<MasterRequestCallerBuilder<?>> prioritySetter, PREQ preq,
507    MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
508    ProcedureBiConsumer consumer) {
509    MasterRequestCallerBuilder<Long> builder = this.<Long> newMasterCaller().action((controller,
510      stub) -> this.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall, respConverter));
511    prioritySetter.accept(builder);
512    CompletableFuture<Long> procFuture = builder.call();
513    CompletableFuture<Void> future = waitProcedureResult(procFuture);
514    addListener(future, consumer);
515    return future;
516  }
517
518  @Override
519  public CompletableFuture<Boolean> tableExists(TableName tableName) {
520    if (TableName.isMetaTableName(tableName)) {
521      return CompletableFuture.completedFuture(true);
522    }
523    return ClientMetaTableAccessor.tableExists(metaTable, tableName);
524  }
525
526  @Override
527  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(boolean includeSysTables) {
528    return getTableDescriptors(
529      RequestConverter.buildGetTableDescriptorsRequest(null, includeSysTables));
530  }
531
532  /**
533   * {@link #listTableDescriptors(boolean)}
534   */
535  @Override
536  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(Pattern pattern,
537    boolean includeSysTables) {
538    Preconditions.checkNotNull(pattern, "pattern is null. If you don't specify a pattern, "
539      + "use listTableDescriptors(boolean) instead");
540    return getTableDescriptors(
541      RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables));
542  }
543
544  @Override
545  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(List<TableName> tableNames) {
546    Preconditions.checkNotNull(tableNames, "tableNames is null. If you don't specify tableNames, "
547      + "use listTableDescriptors(boolean) instead");
548    if (tableNames.isEmpty()) {
549      return CompletableFuture.completedFuture(Collections.emptyList());
550    }
551    return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(tableNames));
552  }
553
554  private CompletableFuture<List<TableDescriptor>>
555    getTableDescriptors(GetTableDescriptorsRequest request) {
556    return this.<List<TableDescriptor>> newMasterCaller()
557      .action((controller, stub) -> this.<GetTableDescriptorsRequest, GetTableDescriptorsResponse,
558        List<TableDescriptor>> call(controller, stub, request,
559          (s, c, req, done) -> s.getTableDescriptors(c, req, done),
560          (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
561      .call();
562  }
563
564  @Override
565  public CompletableFuture<List<TableName>> listTableNames(boolean includeSysTables) {
566    return getTableNames(RequestConverter.buildGetTableNamesRequest(null, includeSysTables));
567  }
568
569  @Override
570  public CompletableFuture<List<TableName>> listTableNames(Pattern pattern,
571    boolean includeSysTables) {
572    Preconditions.checkNotNull(pattern,
573      "pattern is null. If you don't specify a pattern, use listTableNames(boolean) instead");
574    return getTableNames(RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables));
575  }
576
577  @Override
578  public CompletableFuture<List<TableName>> listTableNamesByState(boolean isEnabled) {
579    return this.<List<TableName>> newMasterCaller()
580      .action((controller, stub) -> this.<ListTableNamesByStateRequest,
581        ListTableNamesByStateResponse, List<TableName>> call(controller, stub,
582          ListTableNamesByStateRequest.newBuilder().setIsEnabled(isEnabled).build(),
583          (s, c, req, done) -> s.listTableNamesByState(c, req, done),
584          (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList())))
585      .call();
586  }
587
588  private CompletableFuture<List<TableName>> getTableNames(GetTableNamesRequest request) {
589    return this.<List<TableName>> newMasterCaller()
590      .action((controller, stub) -> this.<GetTableNamesRequest, GetTableNamesResponse,
591        List<TableName>> call(controller, stub, request,
592          (s, c, req, done) -> s.getTableNames(c, req, done),
593          (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList())))
594      .call();
595  }
596
597  @Override
598  public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByNamespace(String name) {
599    return this.<List<TableDescriptor>> newMasterCaller()
600      .action((controller, stub) -> this.<ListTableDescriptorsByNamespaceRequest,
601        ListTableDescriptorsByNamespaceResponse, List<TableDescriptor>> call(controller, stub,
602          ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
603          (s, c, req, done) -> s.listTableDescriptorsByNamespace(c, req, done),
604          (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
605      .call();
606  }
607
608  @Override
609  public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByState(boolean isEnabled) {
610    return this.<List<TableDescriptor>> newMasterCaller()
611      .action((controller, stub) -> this.<ListTableDescriptorsByStateRequest,
612        ListTableDescriptorsByStateResponse, List<TableDescriptor>> call(controller, stub,
613          ListTableDescriptorsByStateRequest.newBuilder().setIsEnabled(isEnabled).build(),
614          (s, c, req, done) -> s.listTableDescriptorsByState(c, req, done),
615          (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
616      .call();
617  }
618
619  @Override
620  public CompletableFuture<List<TableName>> listTableNamesByNamespace(String name) {
621    return this.<List<TableName>> newMasterCaller()
622      .action((controller, stub) -> this.<ListTableNamesByNamespaceRequest,
623        ListTableNamesByNamespaceResponse, List<TableName>> call(controller, stub,
624          ListTableNamesByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
625          (s, c, req, done) -> s.listTableNamesByNamespace(c, req, done),
626          (resp) -> ProtobufUtil.toTableNameList(resp.getTableNameList())))
627      .call();
628  }
629
630  @Override
631  public CompletableFuture<TableDescriptor> getDescriptor(TableName tableName) {
632    CompletableFuture<TableDescriptor> future = new CompletableFuture<>();
633    addListener(this.<List<TableSchema>> newMasterCaller().priority(tableName)
634      .action((controller, stub) -> this.<GetTableDescriptorsRequest, GetTableDescriptorsResponse,
635        List<TableSchema>> call(controller, stub,
636          RequestConverter.buildGetTableDescriptorsRequest(tableName),
637          (s, c, req, done) -> s.getTableDescriptors(c, req, done),
638          (resp) -> resp.getTableSchemaList()))
639      .call(), (tableSchemas, error) -> {
640        if (error != null) {
641          future.completeExceptionally(error);
642          return;
643        }
644        if (!tableSchemas.isEmpty()) {
645          future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0)));
646        } else {
647          future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString()));
648        }
649      });
650    return future;
651  }
652
653  @Override
654  public CompletableFuture<Void> createTable(TableDescriptor desc) {
655    return createTable(desc.getTableName(),
656      RequestConverter.buildCreateTableRequest(desc, null, ng.getNonceGroup(), ng.newNonce()));
657  }
658
659  @Override
660  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[] startKey, byte[] endKey,
661    int numRegions) {
662    try {
663      return createTable(desc, getSplitKeys(startKey, endKey, numRegions));
664    } catch (IllegalArgumentException e) {
665      return failedFuture(e);
666    }
667  }
668
669  @Override
670  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys) {
671    Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys,"
672      + " use createTable(TableDescriptor) instead");
673    try {
674      verifySplitKeys(splitKeys);
675      return createTable(desc.getTableName(), RequestConverter.buildCreateTableRequest(desc,
676        splitKeys, ng.getNonceGroup(), ng.newNonce()));
677    } catch (IllegalArgumentException e) {
678      return failedFuture(e);
679    }
680  }
681
682  private CompletableFuture<Void> createTable(TableName tableName, CreateTableRequest request) {
683    Preconditions.checkNotNull(tableName, "table name is null");
684    return this.<CreateTableRequest, CreateTableResponse> procedureCall(tableName, request,
685      (s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(),
686      new CreateTableProcedureBiConsumer(tableName));
687  }
688
689  @Override
690  public CompletableFuture<Void> modifyTable(TableDescriptor desc) {
691    return this.<ModifyTableRequest, ModifyTableResponse> procedureCall(desc.getTableName(),
692      RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(),
693        ng.newNonce()),
694      (s, c, req, done) -> s.modifyTable(c, req, done), (resp) -> resp.getProcId(),
695      new ModifyTableProcedureBiConsumer(this, desc.getTableName()));
696  }
697
698  @Override
699  public CompletableFuture<Void> modifyTableStoreFileTracker(TableName tableName, String dstSFT) {
700    return this.<ModifyTableStoreFileTrackerRequest,
701      ModifyTableStoreFileTrackerResponse> procedureCall(tableName,
702        RequestConverter.buildModifyTableStoreFileTrackerRequest(tableName, dstSFT,
703          ng.getNonceGroup(), ng.newNonce()),
704        (s, c, req, done) -> s.modifyTableStoreFileTracker(c, req, done),
705        (resp) -> resp.getProcId(),
706        new ModifyTableStoreFileTrackerProcedureBiConsumer(this, tableName));
707  }
708
709  @Override
710  public CompletableFuture<Void> deleteTable(TableName tableName) {
711    return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(tableName,
712      RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
713      (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(),
714      new DeleteTableProcedureBiConsumer(tableName));
715  }
716
717  @Override
718  public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) {
719    return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(tableName,
720      RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(),
721        ng.newNonce()),
722      (s, c, req, done) -> s.truncateTable(c, req, done), (resp) -> resp.getProcId(),
723      new TruncateTableProcedureBiConsumer(tableName));
724  }
725
726  @Override
727  public CompletableFuture<Void> enableTable(TableName tableName) {
728    return this.<EnableTableRequest, EnableTableResponse> procedureCall(tableName,
729      RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
730      (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(),
731      new EnableTableProcedureBiConsumer(tableName));
732  }
733
734  @Override
735  public CompletableFuture<Void> disableTable(TableName tableName) {
736    return this.<DisableTableRequest, DisableTableResponse> procedureCall(tableName,
737      RequestConverter.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
738      (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(),
739      new DisableTableProcedureBiConsumer(tableName));
740  }
741
742  /**
743   * Utility for completing passed TableState {@link CompletableFuture} <code>future</code> using
744   * passed parameters. Sets error or boolean result ('true' if table matches the passed-in
745   * targetState).
746   */
747  private static CompletableFuture<Boolean> completeCheckTableState(
748    CompletableFuture<Boolean> future, TableState tableState, Throwable error,
749    TableState.State targetState, TableName tableName) {
750    if (error != null) {
751      future.completeExceptionally(error);
752    } else {
753      if (tableState != null) {
754        future.complete(tableState.inStates(targetState));
755      } else {
756        future.completeExceptionally(new TableNotFoundException(tableName));
757      }
758    }
759    return future;
760  }
761
762  @Override
763  public CompletableFuture<Boolean> isTableEnabled(TableName tableName) {
764    if (TableName.isMetaTableName(tableName)) {
765      return CompletableFuture.completedFuture(true);
766    }
767    CompletableFuture<Boolean> future = new CompletableFuture<>();
768    addListener(ClientMetaTableAccessor.getTableState(metaTable, tableName),
769      (tableState, error) -> {
770        completeCheckTableState(future, tableState.isPresent() ? tableState.get() : null, error,
771          TableState.State.ENABLED, tableName);
772      });
773    return future;
774  }
775
776  @Override
777  public CompletableFuture<Boolean> isTableDisabled(TableName tableName) {
778    if (TableName.isMetaTableName(tableName)) {
779      return CompletableFuture.completedFuture(false);
780    }
781    CompletableFuture<Boolean> future = new CompletableFuture<>();
782    addListener(ClientMetaTableAccessor.getTableState(metaTable, tableName),
783      (tableState, error) -> {
784        completeCheckTableState(future, tableState.isPresent() ? tableState.get() : null, error,
785          TableState.State.DISABLED, tableName);
786      });
787    return future;
788  }
789
790  @Override
791  public CompletableFuture<Boolean> isTableAvailable(TableName tableName) {
792    if (TableName.isMetaTableName(tableName)) {
793      return connection.registry.getMetaRegionLocations().thenApply(locs -> Stream
794        .of(locs.getRegionLocations()).allMatch(loc -> loc != null && loc.getServerName() != null));
795    }
796    CompletableFuture<Boolean> future = new CompletableFuture<>();
797    addListener(isTableEnabled(tableName), (enabled, error) -> {
798      if (error != null) {
799        if (error instanceof TableNotFoundException) {
800          future.complete(false);
801        } else {
802          future.completeExceptionally(error);
803        }
804        return;
805      }
806      if (!enabled) {
807        future.complete(false);
808      } else {
809        addListener(ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName),
810          (locations, error1) -> {
811            if (error1 != null) {
812              future.completeExceptionally(error1);
813              return;
814            }
815            List<HRegionLocation> notDeployedRegions = locations.stream()
816              .filter(loc -> loc.getServerName() == null).collect(Collectors.toList());
817            if (notDeployedRegions.size() > 0) {
818              if (LOG.isDebugEnabled()) {
819                LOG.debug("Table " + tableName + " has " + notDeployedRegions.size() + " regions");
820              }
821              future.complete(false);
822              return;
823            }
824            future.complete(true);
825          });
826      }
827    });
828    return future;
829  }
830
831  @Override
832  public CompletableFuture<Void> addColumnFamily(TableName tableName,
833    ColumnFamilyDescriptor columnFamily) {
834    return this.<AddColumnRequest, AddColumnResponse> procedureCall(tableName,
835      RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
836        ng.newNonce()),
837      (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(),
838      new AddColumnFamilyProcedureBiConsumer(tableName));
839  }
840
841  @Override
842  public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) {
843    return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(tableName,
844      RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
845        ng.newNonce()),
846      (s, c, req, done) -> s.deleteColumn(c, req, done), (resp) -> resp.getProcId(),
847      new DeleteColumnFamilyProcedureBiConsumer(tableName));
848  }
849
850  @Override
851  public CompletableFuture<Void> modifyColumnFamily(TableName tableName,
852    ColumnFamilyDescriptor columnFamily) {
853    return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(tableName,
854      RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
855        ng.newNonce()),
856      (s, c, req, done) -> s.modifyColumn(c, req, done), (resp) -> resp.getProcId(),
857      new ModifyColumnFamilyProcedureBiConsumer(tableName));
858  }
859
860  @Override
861  public CompletableFuture<Void> modifyColumnFamilyStoreFileTracker(TableName tableName,
862    byte[] family, String dstSFT) {
863    return this.<ModifyColumnStoreFileTrackerRequest,
864      ModifyColumnStoreFileTrackerResponse> procedureCall(tableName,
865        RequestConverter.buildModifyColumnStoreFileTrackerRequest(tableName, family, dstSFT,
866          ng.getNonceGroup(), ng.newNonce()),
867        (s, c, req, done) -> s.modifyColumnStoreFileTracker(c, req, done),
868        (resp) -> resp.getProcId(),
869        new ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer(tableName));
870  }
871
872  @Override
873  public CompletableFuture<Void> createNamespace(NamespaceDescriptor descriptor) {
874    return this.<CreateNamespaceRequest, CreateNamespaceResponse> procedureCall(
875      RequestConverter.buildCreateNamespaceRequest(descriptor),
876      (s, c, req, done) -> s.createNamespace(c, req, done), (resp) -> resp.getProcId(),
877      new CreateNamespaceProcedureBiConsumer(descriptor.getName()));
878  }
879
880  @Override
881  public CompletableFuture<Void> modifyNamespace(NamespaceDescriptor descriptor) {
882    return this.<ModifyNamespaceRequest, ModifyNamespaceResponse> procedureCall(
883      RequestConverter.buildModifyNamespaceRequest(descriptor),
884      (s, c, req, done) -> s.modifyNamespace(c, req, done), (resp) -> resp.getProcId(),
885      new ModifyNamespaceProcedureBiConsumer(descriptor.getName()));
886  }
887
888  @Override
889  public CompletableFuture<Void> deleteNamespace(String name) {
890    return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall(
891      RequestConverter.buildDeleteNamespaceRequest(name),
892      (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(),
893      new DeleteNamespaceProcedureBiConsumer(name));
894  }
895
896  @Override
897  public CompletableFuture<NamespaceDescriptor> getNamespaceDescriptor(String name) {
898    return this.<NamespaceDescriptor> newMasterCaller()
899      .action((controller, stub) -> this.<GetNamespaceDescriptorRequest,
900        GetNamespaceDescriptorResponse, NamespaceDescriptor> call(controller, stub,
901          RequestConverter.buildGetNamespaceDescriptorRequest(name),
902          (s, c, req, done) -> s.getNamespaceDescriptor(c, req, done),
903          (resp) -> ProtobufUtil.toNamespaceDescriptor(resp.getNamespaceDescriptor())))
904      .call();
905  }
906
907  @Override
908  public CompletableFuture<List<String>> listNamespaces() {
909    return this.<List<String>> newMasterCaller()
910      .action((controller, stub) -> this.<ListNamespacesRequest, ListNamespacesResponse,
911        List<String>> call(controller, stub, ListNamespacesRequest.newBuilder().build(),
912          (s, c, req, done) -> s.listNamespaces(c, req, done),
913          (resp) -> resp.getNamespaceNameList()))
914      .call();
915  }
916
917  @Override
918  public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() {
919    return this.<List<NamespaceDescriptor>> newMasterCaller()
920      .action((controller, stub) -> this.<ListNamespaceDescriptorsRequest,
921        ListNamespaceDescriptorsResponse, List<NamespaceDescriptor>> call(controller, stub,
922          ListNamespaceDescriptorsRequest.newBuilder().build(),
923          (s, c, req, done) -> s.listNamespaceDescriptors(c, req, done),
924          (resp) -> ProtobufUtil.toNamespaceDescriptorList(resp)))
925      .call();
926  }
927
928  @Override
929  public CompletableFuture<List<RegionInfo>> getRegions(ServerName serverName) {
930    return this.<List<RegionInfo>> newAdminCaller()
931      .action((controller, stub) -> this.<GetOnlineRegionRequest, GetOnlineRegionResponse,
932        List<RegionInfo>> adminCall(controller, stub,
933          RequestConverter.buildGetOnlineRegionRequest(),
934          (s, c, req, done) -> s.getOnlineRegion(c, req, done),
935          resp -> ProtobufUtil.getRegionInfos(resp)))
936      .serverName(serverName).call();
937  }
938
939  @Override
940  public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) {
941    if (tableName.equals(META_TABLE_NAME)) {
942      return connection.registry.getMetaRegionLocations()
943        .thenApply(locs -> Stream.of(locs.getRegionLocations()).map(HRegionLocation::getRegion)
944          .collect(Collectors.toList()));
945    } else {
946      return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName).thenApply(
947        locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList()));
948    }
949  }
950
951  @Override
952  public CompletableFuture<Void> flush(TableName tableName) {
953    return flush(tableName, null);
954  }
955
956  @Override
957  public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) {
958    CompletableFuture<Void> future = new CompletableFuture<>();
959    addListener(tableExists(tableName), (exists, err) -> {
960      if (err != null) {
961        future.completeExceptionally(err);
962      } else if (!exists) {
963        future.completeExceptionally(new TableNotFoundException(tableName));
964      } else {
965        addListener(isTableEnabled(tableName), (tableEnabled, err2) -> {
966          if (err2 != null) {
967            future.completeExceptionally(err2);
968          } else if (!tableEnabled) {
969            future.completeExceptionally(new TableNotEnabledException(tableName));
970          } else {
971            Map<String, String> props = new HashMap<>();
972            if (columnFamily != null) {
973              props.put(HConstants.FAMILY_KEY_STR, Bytes.toString(columnFamily));
974            }
975            addListener(
976              execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(), props),
977              (ret, err3) -> {
978                if (err3 != null) {
979                  future.completeExceptionally(err3);
980                } else {
981                  future.complete(ret);
982                }
983              });
984          }
985        });
986      }
987    });
988    return future;
989  }
990
991  @Override
992  public CompletableFuture<Void> flushRegion(byte[] regionName) {
993    return flushRegionInternal(regionName, null, false).thenAccept(r -> {
994    });
995  }
996
997  @Override
998  public CompletableFuture<Void> flushRegion(byte[] regionName, byte[] columnFamily) {
999    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1000      + "If you don't specify a columnFamily, use flushRegion(regionName) instead");
1001    return flushRegionInternal(regionName, columnFamily, false).thenAccept(r -> {
1002    });
1003  }
1004
1005  /**
1006   * This method is for internal use only, where we need the response of the flush.
1007   * <p/>
1008   * As it exposes the protobuf message, please do <strong>NOT</strong> try to expose it as a public
1009   * API.
1010   */
1011  CompletableFuture<FlushRegionResponse> flushRegionInternal(byte[] regionName, byte[] columnFamily,
1012    boolean writeFlushWALMarker) {
1013    CompletableFuture<FlushRegionResponse> future = new CompletableFuture<>();
1014    addListener(getRegionLocation(regionName), (location, err) -> {
1015      if (err != null) {
1016        future.completeExceptionally(err);
1017        return;
1018      }
1019      ServerName serverName = location.getServerName();
1020      if (serverName == null) {
1021        future
1022          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1023        return;
1024      }
1025      addListener(flush(serverName, location.getRegion(), columnFamily, writeFlushWALMarker),
1026        (ret, err2) -> {
1027          if (err2 != null) {
1028            future.completeExceptionally(err2);
1029          } else {
1030            future.complete(ret);
1031          }
1032        });
1033    });
1034    return future;
1035  }
1036
1037  private CompletableFuture<FlushRegionResponse> flush(ServerName serverName, RegionInfo regionInfo,
1038    byte[] columnFamily, boolean writeFlushWALMarker) {
1039    return this.<FlushRegionResponse> newAdminCaller().serverName(serverName)
1040      .action((controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse,
1041        FlushRegionResponse> adminCall(controller, stub,
1042          RequestConverter.buildFlushRegionRequest(regionInfo.getRegionName(), columnFamily,
1043            writeFlushWALMarker),
1044          (s, c, req, done) -> s.flushRegion(c, req, done), resp -> resp))
1045      .call();
1046  }
1047
1048  @Override
1049  public CompletableFuture<Void> flushRegionServer(ServerName sn) {
1050    CompletableFuture<Void> future = new CompletableFuture<>();
1051    addListener(getRegions(sn), (hRegionInfos, err) -> {
1052      if (err != null) {
1053        future.completeExceptionally(err);
1054        return;
1055      }
1056      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
1057      if (hRegionInfos != null) {
1058        hRegionInfos
1059          .forEach(region -> compactFutures.add(flush(sn, region, null, false).thenAccept(r -> {
1060          })));
1061      }
1062      addListener(CompletableFuture.allOf(
1063        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
1064          if (err2 != null) {
1065            future.completeExceptionally(err2);
1066          } else {
1067            future.complete(ret);
1068          }
1069        });
1070    });
1071    return future;
1072  }
1073
1074  @Override
1075  public CompletableFuture<Void> compact(TableName tableName, CompactType compactType) {
1076    return compact(tableName, null, false, compactType);
1077  }
1078
1079  @Override
1080  public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily,
1081    CompactType compactType) {
1082    Preconditions.checkNotNull(columnFamily, "columnFamily is null. "
1083      + "If you don't specify a columnFamily, use compact(TableName) instead");
1084    return compact(tableName, columnFamily, false, compactType);
1085  }
1086
1087  @Override
1088  public CompletableFuture<Void> compactRegion(byte[] regionName) {
1089    return compactRegion(regionName, null, false);
1090  }
1091
1092  @Override
1093  public CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily) {
1094    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1095      + " If you don't specify a columnFamily, use compactRegion(regionName) instead");
1096    return compactRegion(regionName, columnFamily, false);
1097  }
1098
1099  @Override
1100  public CompletableFuture<Void> majorCompact(TableName tableName, CompactType compactType) {
1101    return compact(tableName, null, true, compactType);
1102  }
1103
1104  @Override
1105  public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily,
1106    CompactType compactType) {
1107    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1108      + "If you don't specify a columnFamily, use compact(TableName) instead");
1109    return compact(tableName, columnFamily, true, compactType);
1110  }
1111
1112  @Override
1113  public CompletableFuture<Void> majorCompactRegion(byte[] regionName) {
1114    return compactRegion(regionName, null, true);
1115  }
1116
1117  @Override
1118  public CompletableFuture<Void> majorCompactRegion(byte[] regionName, byte[] columnFamily) {
1119    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1120      + " If you don't specify a columnFamily, use majorCompactRegion(regionName) instead");
1121    return compactRegion(regionName, columnFamily, true);
1122  }
1123
1124  @Override
1125  public CompletableFuture<Void> compactRegionServer(ServerName sn) {
1126    return compactRegionServer(sn, false);
1127  }
1128
1129  @Override
1130  public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) {
1131    return compactRegionServer(sn, true);
1132  }
1133
1134  private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) {
1135    CompletableFuture<Void> future = new CompletableFuture<>();
1136    addListener(getRegions(sn), (hRegionInfos, err) -> {
1137      if (err != null) {
1138        future.completeExceptionally(err);
1139        return;
1140      }
1141      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
1142      if (hRegionInfos != null) {
1143        hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, null)));
1144      }
1145      addListener(CompletableFuture.allOf(
1146        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
1147          if (err2 != null) {
1148            future.completeExceptionally(err2);
1149          } else {
1150            future.complete(ret);
1151          }
1152        });
1153    });
1154    return future;
1155  }
1156
1157  private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily,
1158    boolean major) {
1159    CompletableFuture<Void> future = new CompletableFuture<>();
1160    addListener(getRegionLocation(regionName), (location, err) -> {
1161      if (err != null) {
1162        future.completeExceptionally(err);
1163        return;
1164      }
1165      ServerName serverName = location.getServerName();
1166      if (serverName == null) {
1167        future
1168          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1169        return;
1170      }
1171      addListener(compact(location.getServerName(), location.getRegion(), major, columnFamily),
1172        (ret, err2) -> {
1173          if (err2 != null) {
1174            future.completeExceptionally(err2);
1175          } else {
1176            future.complete(ret);
1177          }
1178        });
1179    });
1180    return future;
1181  }
1182
1183  /**
1184   * List all region locations for the specific table.
1185   */
1186  private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) {
1187    if (TableName.META_TABLE_NAME.equals(tableName)) {
1188      CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
1189      addListener(connection.registry.getMetaRegionLocations(), (metaRegions, err) -> {
1190        if (err != null) {
1191          future.completeExceptionally(err);
1192        } else if (
1193          metaRegions == null || metaRegions.isEmpty()
1194            || metaRegions.getDefaultRegionLocation() == null
1195        ) {
1196          future.completeExceptionally(new IOException("meta region does not found"));
1197        } else {
1198          future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
1199        }
1200      });
1201      return future;
1202    } else {
1203      // For non-meta table, we fetch all locations by scanning hbase:meta table
1204      return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName);
1205    }
1206  }
1207
1208  /**
1209   * Compact column family of a table, Asynchronous operation even if CompletableFuture.get()
1210   */
1211  private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, boolean major,
1212    CompactType compactType) {
1213    CompletableFuture<Void> future = new CompletableFuture<>();
1214
1215    switch (compactType) {
1216      case MOB:
1217        addListener(connection.registry.getActiveMaster(), (serverName, err) -> {
1218          if (err != null) {
1219            future.completeExceptionally(err);
1220            return;
1221          }
1222          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
1223          addListener(compact(serverName, regionInfo, major, columnFamily), (ret, err2) -> {
1224            if (err2 != null) {
1225              future.completeExceptionally(err2);
1226            } else {
1227              future.complete(ret);
1228            }
1229          });
1230        });
1231        break;
1232      case NORMAL:
1233        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
1234          if (err != null) {
1235            future.completeExceptionally(err);
1236            return;
1237          }
1238          if (locations == null || locations.isEmpty()) {
1239            future.completeExceptionally(new TableNotFoundException(tableName));
1240          }
1241          CompletableFuture<?>[] compactFutures =
1242            locations.stream().filter(l -> l.getRegion() != null)
1243              .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
1244              .map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily))
1245              .toArray(CompletableFuture<?>[]::new);
1246          // future complete unless all of the compact futures are completed.
1247          addListener(CompletableFuture.allOf(compactFutures), (ret, err2) -> {
1248            if (err2 != null) {
1249              future.completeExceptionally(err2);
1250            } else {
1251              future.complete(ret);
1252            }
1253          });
1254        });
1255        break;
1256      default:
1257        throw new IllegalArgumentException("Unknown compactType: " + compactType);
1258    }
1259    return future;
1260  }
1261
1262  /**
1263   * Compact the region at specific region server.
1264   */
1265  private CompletableFuture<Void> compact(final ServerName sn, final RegionInfo hri,
1266    final boolean major, byte[] columnFamily) {
1267    return this.<Void> newAdminCaller().serverName(sn)
1268      .action((controller, stub) -> this.<CompactRegionRequest, CompactRegionResponse,
1269        Void> adminCall(controller, stub,
1270          RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, columnFamily),
1271          (s, c, req, done) -> s.compactRegion(c, req, done), resp -> null))
1272      .call();
1273  }
1274
1275  private byte[] toEncodeRegionName(byte[] regionName) {
1276    return RegionInfo.isEncodedRegionName(regionName)
1277      ? regionName
1278      : Bytes.toBytes(RegionInfo.encodeRegionName(regionName));
1279  }
1280
1281  private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName,
1282    CompletableFuture<TableName> result) {
1283    addListener(getRegionLocation(encodeRegionName), (location, err) -> {
1284      if (err != null) {
1285        result.completeExceptionally(err);
1286        return;
1287      }
1288      RegionInfo regionInfo = location.getRegion();
1289      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1290        result.completeExceptionally(
1291          new IllegalArgumentException("Can't invoke merge on non-default regions directly"));
1292        return;
1293      }
1294      if (!tableName.compareAndSet(null, regionInfo.getTable())) {
1295        if (!tableName.get().equals(regionInfo.getTable())) {
1296          // tables of this two region should be same.
1297          result.completeExceptionally(
1298            new IllegalArgumentException("Cannot merge regions from two different tables "
1299              + tableName.get() + " and " + regionInfo.getTable()));
1300        } else {
1301          result.complete(tableName.get());
1302        }
1303      }
1304    });
1305  }
1306
1307  private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[][] encodedRegionNames) {
1308    AtomicReference<TableName> tableNameRef = new AtomicReference<>();
1309    CompletableFuture<TableName> future = new CompletableFuture<>();
1310    for (byte[] encodedRegionName : encodedRegionNames) {
1311      checkAndGetTableName(encodedRegionName, tableNameRef, future);
1312    }
1313    return future;
1314  }
1315
1316  @Override
1317  public CompletableFuture<Boolean> mergeSwitch(boolean enabled, boolean drainMerges) {
1318    return setSplitOrMergeOn(enabled, drainMerges, MasterSwitchType.MERGE);
1319  }
1320
1321  @Override
1322  public CompletableFuture<Boolean> isMergeEnabled() {
1323    return isSplitOrMergeOn(MasterSwitchType.MERGE);
1324  }
1325
1326  @Override
1327  public CompletableFuture<Boolean> splitSwitch(boolean enabled, boolean drainSplits) {
1328    return setSplitOrMergeOn(enabled, drainSplits, MasterSwitchType.SPLIT);
1329  }
1330
1331  @Override
1332  public CompletableFuture<Boolean> isSplitEnabled() {
1333    return isSplitOrMergeOn(MasterSwitchType.SPLIT);
1334  }
1335
1336  private CompletableFuture<Boolean> setSplitOrMergeOn(boolean enabled, boolean synchronous,
1337    MasterSwitchType switchType) {
1338    SetSplitOrMergeEnabledRequest request =
1339      RequestConverter.buildSetSplitOrMergeEnabledRequest(enabled, synchronous, switchType);
1340    return this.<Boolean> newMasterCaller()
1341      .action((controller, stub) -> this.<SetSplitOrMergeEnabledRequest,
1342        SetSplitOrMergeEnabledResponse, Boolean> call(controller, stub, request,
1343          (s, c, req, done) -> s.setSplitOrMergeEnabled(c, req, done),
1344          (resp) -> resp.getPrevValueList().get(0)))
1345      .call();
1346  }
1347
1348  private CompletableFuture<Boolean> isSplitOrMergeOn(MasterSwitchType switchType) {
1349    IsSplitOrMergeEnabledRequest request =
1350      RequestConverter.buildIsSplitOrMergeEnabledRequest(switchType);
1351    return this.<Boolean> newMasterCaller()
1352      .action((controller, stub) -> this.<IsSplitOrMergeEnabledRequest,
1353        IsSplitOrMergeEnabledResponse, Boolean> call(controller, stub, request,
1354          (s, c, req, done) -> s.isSplitOrMergeEnabled(c, req, done), (resp) -> resp.getEnabled()))
1355      .call();
1356  }
1357
1358  @Override
1359  public CompletableFuture<Void> mergeRegions(List<byte[]> nameOfRegionsToMerge, boolean forcible) {
1360    if (nameOfRegionsToMerge.size() < 2) {
1361      return failedFuture(new IllegalArgumentException(
1362        "Can not merge only " + nameOfRegionsToMerge.size() + " region"));
1363    }
1364    CompletableFuture<Void> future = new CompletableFuture<>();
1365    byte[][] encodedNameOfRegionsToMerge =
1366      nameOfRegionsToMerge.stream().map(this::toEncodeRegionName).toArray(byte[][]::new);
1367
1368    addListener(checkRegionsAndGetTableName(encodedNameOfRegionsToMerge), (tableName, err) -> {
1369      if (err != null) {
1370        future.completeExceptionally(err);
1371        return;
1372      }
1373
1374      final MergeTableRegionsRequest request;
1375      try {
1376        request = RequestConverter.buildMergeTableRegionsRequest(encodedNameOfRegionsToMerge,
1377          forcible, ng.getNonceGroup(), ng.newNonce());
1378      } catch (DeserializationException e) {
1379        future.completeExceptionally(e);
1380        return;
1381      }
1382
1383      addListener(
1384        this.procedureCall(tableName, request, MasterService.Interface::mergeTableRegions,
1385          MergeTableRegionsResponse::getProcId, new MergeTableRegionProcedureBiConsumer(tableName)),
1386        (ret, err2) -> {
1387          if (err2 != null) {
1388            future.completeExceptionally(err2);
1389          } else {
1390            future.complete(ret);
1391          }
1392        });
1393    });
1394    return future;
1395  }
1396
1397  @Override
1398  public CompletableFuture<Void> split(TableName tableName) {
1399    CompletableFuture<Void> future = new CompletableFuture<>();
1400    addListener(tableExists(tableName), (exist, error) -> {
1401      if (error != null) {
1402        future.completeExceptionally(error);
1403        return;
1404      }
1405      if (!exist) {
1406        future.completeExceptionally(new TableNotFoundException(tableName));
1407        return;
1408      }
1409      addListener(metaTable
1410        .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)
1411          .withStartRow(ClientMetaTableAccessor.getTableStartRowForMeta(tableName,
1412            ClientMetaTableAccessor.QueryType.REGION))
1413          .withStopRow(ClientMetaTableAccessor.getTableStopRowForMeta(tableName,
1414            ClientMetaTableAccessor.QueryType.REGION))),
1415        (results, err2) -> {
1416          if (err2 != null) {
1417            future.completeExceptionally(err2);
1418            return;
1419          }
1420          if (results != null && !results.isEmpty()) {
1421            List<CompletableFuture<Void>> splitFutures = new ArrayList<>();
1422            for (Result r : results) {
1423              if (r.isEmpty() || CatalogFamilyFormat.getRegionInfo(r) == null) {
1424                continue;
1425              }
1426              RegionLocations rl = CatalogFamilyFormat.getRegionLocations(r);
1427              if (rl != null) {
1428                for (HRegionLocation h : rl.getRegionLocations()) {
1429                  if (h != null && h.getServerName() != null) {
1430                    RegionInfo hri = h.getRegion();
1431                    if (
1432                      hri == null || hri.isSplitParent()
1433                        || hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID
1434                    ) {
1435                      continue;
1436                    }
1437                    splitFutures.add(split(hri, null));
1438                  }
1439                }
1440              }
1441            }
1442            addListener(
1443              CompletableFuture
1444                .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()])),
1445              (ret, exception) -> {
1446                if (exception != null) {
1447                  future.completeExceptionally(exception);
1448                  return;
1449                }
1450                future.complete(ret);
1451              });
1452          } else {
1453            future.complete(null);
1454          }
1455        });
1456    });
1457    return future;
1458  }
1459
1460  @Override
1461  public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) {
1462    CompletableFuture<Void> result = new CompletableFuture<>();
1463    if (splitPoint == null) {
1464      return failedFuture(new IllegalArgumentException("splitPoint can not be null."));
1465    }
1466    addListener(connection.getRegionLocator(tableName).getRegionLocation(splitPoint, true),
1467      (loc, err) -> {
1468        if (err != null) {
1469          result.completeExceptionally(err);
1470        } else if (loc == null || loc.getRegion() == null) {
1471          result.completeExceptionally(new IllegalArgumentException(
1472            "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint)));
1473        } else {
1474          addListener(splitRegion(loc.getRegion().getRegionName(), splitPoint), (ret, err2) -> {
1475            if (err2 != null) {
1476              result.completeExceptionally(err2);
1477            } else {
1478              result.complete(ret);
1479            }
1480
1481          });
1482        }
1483      });
1484    return result;
1485  }
1486
1487  @Override
1488  public CompletableFuture<Void> splitRegion(byte[] regionName) {
1489    CompletableFuture<Void> future = new CompletableFuture<>();
1490    addListener(getRegionLocation(regionName), (location, err) -> {
1491      if (err != null) {
1492        future.completeExceptionally(err);
1493        return;
1494      }
1495      RegionInfo regionInfo = location.getRegion();
1496      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1497        future.completeExceptionally(new IllegalArgumentException("Can't split replicas directly. "
1498          + "Replicas are auto-split when their primary is split."));
1499        return;
1500      }
1501      ServerName serverName = location.getServerName();
1502      if (serverName == null) {
1503        future
1504          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1505        return;
1506      }
1507      addListener(split(regionInfo, null), (ret, err2) -> {
1508        if (err2 != null) {
1509          future.completeExceptionally(err2);
1510        } else {
1511          future.complete(ret);
1512        }
1513      });
1514    });
1515    return future;
1516  }
1517
1518  @Override
1519  public CompletableFuture<Void> splitRegion(byte[] regionName, byte[] splitPoint) {
1520    Preconditions.checkNotNull(splitPoint,
1521      "splitPoint is null. If you don't specify a splitPoint, use splitRegion(byte[]) instead");
1522    CompletableFuture<Void> future = new CompletableFuture<>();
1523    addListener(getRegionLocation(regionName), (location, err) -> {
1524      if (err != null) {
1525        future.completeExceptionally(err);
1526        return;
1527      }
1528      RegionInfo regionInfo = location.getRegion();
1529      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1530        future.completeExceptionally(new IllegalArgumentException("Can't split replicas directly. "
1531          + "Replicas are auto-split when their primary is split."));
1532        return;
1533      }
1534      ServerName serverName = location.getServerName();
1535      if (serverName == null) {
1536        future
1537          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1538        return;
1539      }
1540      if (
1541        regionInfo.getStartKey() != null
1542          && Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0
1543      ) {
1544        future.completeExceptionally(
1545          new IllegalArgumentException("should not give a splitkey which equals to startkey!"));
1546        return;
1547      }
1548      addListener(split(regionInfo, splitPoint), (ret, err2) -> {
1549        if (err2 != null) {
1550          future.completeExceptionally(err2);
1551        } else {
1552          future.complete(ret);
1553        }
1554      });
1555    });
1556    return future;
1557  }
1558
1559  private CompletableFuture<Void> split(final RegionInfo hri, byte[] splitPoint) {
1560    CompletableFuture<Void> future = new CompletableFuture<>();
1561    TableName tableName = hri.getTable();
1562    final SplitTableRegionRequest request;
1563    try {
1564      request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(),
1565        ng.newNonce());
1566    } catch (DeserializationException e) {
1567      future.completeExceptionally(e);
1568      return future;
1569    }
1570
1571    addListener(
1572      this.procedureCall(tableName, request, MasterService.Interface::splitRegion,
1573        SplitTableRegionResponse::getProcId, new SplitTableRegionProcedureBiConsumer(tableName)),
1574      (ret, err2) -> {
1575        if (err2 != null) {
1576          future.completeExceptionally(err2);
1577        } else {
1578          future.complete(ret);
1579        }
1580      });
1581    return future;
1582  }
1583
1584  @Override
1585  public CompletableFuture<Void> assign(byte[] regionName) {
1586    CompletableFuture<Void> future = new CompletableFuture<>();
1587    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1588      if (err != null) {
1589        future.completeExceptionally(err);
1590        return;
1591      }
1592      addListener(
1593        this.<Void> newMasterCaller().priority(regionInfo.getTable())
1594          .action((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call(
1595            controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()),
1596            (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null))
1597          .call(),
1598        (ret, err2) -> {
1599          if (err2 != null) {
1600            future.completeExceptionally(err2);
1601          } else {
1602            future.complete(ret);
1603          }
1604        });
1605    });
1606    return future;
1607  }
1608
1609  @Override
1610  public CompletableFuture<Void> unassign(byte[] regionName) {
1611    CompletableFuture<Void> future = new CompletableFuture<>();
1612    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1613      if (err != null) {
1614        future.completeExceptionally(err);
1615        return;
1616      }
1617      addListener(
1618        this.<Void> newMasterCaller().priority(regionInfo.getTable())
1619          .action((controller, stub) -> this.<UnassignRegionRequest, UnassignRegionResponse,
1620            Void> call(controller, stub,
1621              RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName()),
1622              (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null))
1623          .call(),
1624        (ret, err2) -> {
1625          if (err2 != null) {
1626            future.completeExceptionally(err2);
1627          } else {
1628            future.complete(ret);
1629          }
1630        });
1631    });
1632    return future;
1633  }
1634
1635  @Override
1636  public CompletableFuture<Void> offline(byte[] regionName) {
1637    CompletableFuture<Void> future = new CompletableFuture<>();
1638    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1639      if (err != null) {
1640        future.completeExceptionally(err);
1641        return;
1642      }
1643      addListener(this.<Void> newMasterCaller().priority(regionInfo.getTable())
1644        .action((controller, stub) -> this.<OfflineRegionRequest, OfflineRegionResponse, Void> call(
1645          controller, stub, RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()),
1646          (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null))
1647        .call(), (ret, err2) -> {
1648          if (err2 != null) {
1649            future.completeExceptionally(err2);
1650          } else {
1651            future.complete(ret);
1652          }
1653        });
1654    });
1655    return future;
1656  }
1657
1658  @Override
1659  public CompletableFuture<Void> move(byte[] regionName) {
1660    CompletableFuture<Void> future = new CompletableFuture<>();
1661    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1662      if (err != null) {
1663        future.completeExceptionally(err);
1664        return;
1665      }
1666      addListener(
1667        moveRegion(regionInfo,
1668          RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)),
1669        (ret, err2) -> {
1670          if (err2 != null) {
1671            future.completeExceptionally(err2);
1672          } else {
1673            future.complete(ret);
1674          }
1675        });
1676    });
1677    return future;
1678  }
1679
1680  @Override
1681  public CompletableFuture<Void> move(byte[] regionName, ServerName destServerName) {
1682    Preconditions.checkNotNull(destServerName,
1683      "destServerName is null. If you don't specify a destServerName, use move(byte[]) instead");
1684    CompletableFuture<Void> future = new CompletableFuture<>();
1685    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1686      if (err != null) {
1687        future.completeExceptionally(err);
1688        return;
1689      }
1690      addListener(
1691        moveRegion(regionInfo, RequestConverter
1692          .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)),
1693        (ret, err2) -> {
1694          if (err2 != null) {
1695            future.completeExceptionally(err2);
1696          } else {
1697            future.complete(ret);
1698          }
1699        });
1700    });
1701    return future;
1702  }
1703
1704  private CompletableFuture<Void> moveRegion(RegionInfo regionInfo, MoveRegionRequest request) {
1705    return this.<Void> newMasterCaller().priority(regionInfo.getTable())
1706      .action(
1707        (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(controller,
1708          stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null))
1709      .call();
1710  }
1711
1712  @Override
1713  public CompletableFuture<Void> setQuota(QuotaSettings quota) {
1714    return this.<Void> newMasterCaller()
1715      .action((controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(controller,
1716        stub, QuotaSettings.buildSetQuotaRequestProto(quota),
1717        (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null))
1718      .call();
1719  }
1720
1721  @Override
1722  public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) {
1723    CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>();
1724    Scan scan = QuotaTableUtil.makeScan(filter);
1725    this.connection.getTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build().scan(scan,
1726      new AdvancedScanResultConsumer() {
1727        List<QuotaSettings> settings = new ArrayList<>();
1728
1729        @Override
1730        public void onNext(Result[] results, ScanController controller) {
1731          for (Result result : results) {
1732            try {
1733              QuotaTableUtil.parseResultToCollection(result, settings);
1734            } catch (IOException e) {
1735              controller.terminate();
1736              future.completeExceptionally(e);
1737            }
1738          }
1739        }
1740
1741        @Override
1742        public void onError(Throwable error) {
1743          future.completeExceptionally(error);
1744        }
1745
1746        @Override
1747        public void onComplete() {
1748          future.complete(settings);
1749        }
1750      });
1751    return future;
1752  }
1753
1754  @Override
1755  public CompletableFuture<Void> addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig,
1756    boolean enabled) {
1757    return this.<AddReplicationPeerRequest, AddReplicationPeerResponse> procedureCall(
1758      RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled),
1759      (s, c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1760      new ReplicationProcedureBiConsumer(peerId, () -> "ADD_REPLICATION_PEER"));
1761  }
1762
1763  @Override
1764  public CompletableFuture<Void> removeReplicationPeer(String peerId) {
1765    return this.<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse> procedureCall(
1766      RequestConverter.buildRemoveReplicationPeerRequest(peerId),
1767      (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1768      new ReplicationProcedureBiConsumer(peerId, () -> "REMOVE_REPLICATION_PEER"));
1769  }
1770
1771  @Override
1772  public CompletableFuture<Void> enableReplicationPeer(String peerId) {
1773    return this.<EnableReplicationPeerRequest, EnableReplicationPeerResponse> procedureCall(
1774      RequestConverter.buildEnableReplicationPeerRequest(peerId),
1775      (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1776      new ReplicationProcedureBiConsumer(peerId, () -> "ENABLE_REPLICATION_PEER"));
1777  }
1778
1779  @Override
1780  public CompletableFuture<Void> disableReplicationPeer(String peerId) {
1781    return this.<DisableReplicationPeerRequest, DisableReplicationPeerResponse> procedureCall(
1782      RequestConverter.buildDisableReplicationPeerRequest(peerId),
1783      (s, c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1784      new ReplicationProcedureBiConsumer(peerId, () -> "DISABLE_REPLICATION_PEER"));
1785  }
1786
1787  @Override
1788  public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) {
1789    return this.<ReplicationPeerConfig> newMasterCaller()
1790      .action((controller, stub) -> this.<GetReplicationPeerConfigRequest,
1791        GetReplicationPeerConfigResponse, ReplicationPeerConfig> call(controller, stub,
1792          RequestConverter.buildGetReplicationPeerConfigRequest(peerId),
1793          (s, c, req, done) -> s.getReplicationPeerConfig(c, req, done),
1794          (resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig())))
1795      .call();
1796  }
1797
1798  @Override
1799  public CompletableFuture<Void> updateReplicationPeerConfig(String peerId,
1800    ReplicationPeerConfig peerConfig) {
1801    return this.<UpdateReplicationPeerConfigRequest,
1802      UpdateReplicationPeerConfigResponse> procedureCall(
1803        RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig),
1804        (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done),
1805        (resp) -> resp.getProcId(),
1806        new ReplicationProcedureBiConsumer(peerId, () -> "UPDATE_REPLICATION_PEER_CONFIG"));
1807  }
1808
1809  @Override
1810  public CompletableFuture<Void> transitReplicationPeerSyncReplicationState(String peerId,
1811    SyncReplicationState clusterState) {
1812    return this.<TransitReplicationPeerSyncReplicationStateRequest,
1813      TransitReplicationPeerSyncReplicationStateResponse> procedureCall(
1814        RequestConverter.buildTransitReplicationPeerSyncReplicationStateRequest(peerId,
1815          clusterState),
1816        (s, c, req, done) -> s.transitReplicationPeerSyncReplicationState(c, req, done),
1817        (resp) -> resp.getProcId(), new ReplicationProcedureBiConsumer(peerId,
1818          () -> "TRANSIT_REPLICATION_PEER_SYNCHRONOUS_REPLICATION_STATE"));
1819  }
1820
1821  @Override
1822  public CompletableFuture<Void> appendReplicationPeerTableCFs(String id,
1823    Map<TableName, List<String>> tableCfs) {
1824    if (tableCfs == null) {
1825      return failedFuture(new ReplicationException("tableCfs is null"));
1826    }
1827
1828    CompletableFuture<Void> future = new CompletableFuture<Void>();
1829    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
1830      if (!completeExceptionally(future, error)) {
1831        ReplicationPeerConfig newPeerConfig =
1832          ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
1833        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
1834          if (!completeExceptionally(future, error)) {
1835            future.complete(result);
1836          }
1837        });
1838      }
1839    });
1840    return future;
1841  }
1842
1843  @Override
1844  public CompletableFuture<Void> removeReplicationPeerTableCFs(String id,
1845    Map<TableName, List<String>> tableCfs) {
1846    if (tableCfs == null) {
1847      return failedFuture(new ReplicationException("tableCfs is null"));
1848    }
1849
1850    CompletableFuture<Void> future = new CompletableFuture<Void>();
1851    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
1852      if (!completeExceptionally(future, error)) {
1853        ReplicationPeerConfig newPeerConfig = null;
1854        try {
1855          newPeerConfig = ReplicationPeerConfigUtil
1856            .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
1857        } catch (ReplicationException e) {
1858          future.completeExceptionally(e);
1859          return;
1860        }
1861        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
1862          if (!completeExceptionally(future, error)) {
1863            future.complete(result);
1864          }
1865        });
1866      }
1867    });
1868    return future;
1869  }
1870
1871  @Override
1872  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers() {
1873    return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(null));
1874  }
1875
1876  @Override
1877  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Pattern pattern) {
1878    Preconditions.checkNotNull(pattern,
1879      "pattern is null. If you don't specify a pattern, use listReplicationPeers() instead");
1880    return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(pattern));
1881  }
1882
1883  private CompletableFuture<List<ReplicationPeerDescription>>
1884    listReplicationPeers(ListReplicationPeersRequest request) {
1885    return this.<List<ReplicationPeerDescription>> newMasterCaller()
1886      .action((controller, stub) -> this.<ListReplicationPeersRequest, ListReplicationPeersResponse,
1887        List<ReplicationPeerDescription>> call(controller, stub, request,
1888          (s, c, req, done) -> s.listReplicationPeers(c, req, done),
1889          (resp) -> resp.getPeerDescList().stream()
1890            .map(ReplicationPeerConfigUtil::toReplicationPeerDescription)
1891            .collect(Collectors.toList())))
1892      .call();
1893  }
1894
1895  @Override
1896  public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() {
1897    CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>();
1898    addListener(listTableDescriptors(), (tables, error) -> {
1899      if (!completeExceptionally(future, error)) {
1900        List<TableCFs> replicatedTableCFs = new ArrayList<>();
1901        tables.forEach(table -> {
1902          Map<String, Integer> cfs = new HashMap<>();
1903          Stream.of(table.getColumnFamilies())
1904            .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL)
1905            .forEach(column -> {
1906              cfs.put(column.getNameAsString(), column.getScope());
1907            });
1908          if (!cfs.isEmpty()) {
1909            replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs));
1910          }
1911        });
1912        future.complete(replicatedTableCFs);
1913      }
1914    });
1915    return future;
1916  }
1917
1918  @Override
1919  public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) {
1920    SnapshotProtos.SnapshotDescription snapshot =
1921      ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc);
1922    try {
1923      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
1924    } catch (IllegalArgumentException e) {
1925      return failedFuture(e);
1926    }
1927    CompletableFuture<Void> future = new CompletableFuture<>();
1928    final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot)
1929      .setNonceGroup(ng.getNonceGroup()).setNonce(ng.newNonce()).build();
1930    addListener(this.<SnapshotResponse> newMasterCaller()
1931      .action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, SnapshotResponse> call(
1932        controller, stub, request, (s, c, req, done) -> s.snapshot(c, req, done), resp -> resp))
1933      .call(), (resp, err) -> {
1934        if (err != null) {
1935          future.completeExceptionally(err);
1936          return;
1937        }
1938        waitSnapshotFinish(snapshotDesc, future, resp);
1939      });
1940    return future;
1941  }
1942
1943  // This is for keeping compatibility with old implementation.
1944  // If there is a procId field in the response, then the snapshot will be operated with a
1945  // SnapshotProcedure, otherwise the snapshot will be coordinated by zk.
1946  private void waitSnapshotFinish(SnapshotDescription snapshot, CompletableFuture<Void> future,
1947    SnapshotResponse resp) {
1948    if (resp.hasProcId()) {
1949      getProcedureResult(resp.getProcId(), future, 0);
1950      addListener(future, new SnapshotProcedureBiConsumer(snapshot.getTableName()));
1951    } else {
1952      long expectedTimeout = resp.getExpectedTimeout();
1953      TimerTask pollingTask = new TimerTask() {
1954        int tries = 0;
1955        long startTime = EnvironmentEdgeManager.currentTime();
1956        long endTime = startTime + expectedTimeout;
1957        long maxPauseTime = expectedTimeout / maxAttempts;
1958
1959        @Override
1960        public void run(Timeout timeout) throws Exception {
1961          if (EnvironmentEdgeManager.currentTime() < endTime) {
1962            addListener(isSnapshotFinished(snapshot), (done, err2) -> {
1963              if (err2 != null) {
1964                future.completeExceptionally(err2);
1965              } else if (done) {
1966                future.complete(null);
1967              } else {
1968                // retry again after pauseTime.
1969                long pauseTime =
1970                  ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
1971                pauseTime = Math.min(pauseTime, maxPauseTime);
1972                AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, TimeUnit.MILLISECONDS);
1973              }
1974            });
1975          } else {
1976            future
1977              .completeExceptionally(new SnapshotCreationException("Snapshot '" + snapshot.getName()
1978                + "' wasn't completed in expectedTime:" + expectedTimeout + " ms", snapshot));
1979          }
1980        }
1981      };
1982      AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
1983    }
1984  }
1985
1986  @Override
1987  public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) {
1988    return this.<Boolean> newMasterCaller()
1989      .action((controller, stub) -> this.<IsSnapshotDoneRequest, IsSnapshotDoneResponse,
1990        Boolean> call(controller, stub,
1991          IsSnapshotDoneRequest.newBuilder()
1992            .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(),
1993          (s, c, req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone()))
1994      .call();
1995  }
1996
1997  @Override
1998  public CompletableFuture<Void> restoreSnapshot(String snapshotName) {
1999    boolean takeFailSafeSnapshot = this.connection.getConfiguration().getBoolean(
2000      HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT,
2001      HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT);
2002    return restoreSnapshot(snapshotName, takeFailSafeSnapshot);
2003  }
2004
2005  @Override
2006  public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot,
2007    boolean restoreAcl) {
2008    CompletableFuture<Void> future = new CompletableFuture<>();
2009    addListener(listSnapshots(Pattern.compile(snapshotName)), (snapshotDescriptions, err) -> {
2010      if (err != null) {
2011        future.completeExceptionally(err);
2012        return;
2013      }
2014      TableName tableName = null;
2015      if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) {
2016        for (SnapshotDescription snap : snapshotDescriptions) {
2017          if (snap.getName().equals(snapshotName)) {
2018            tableName = snap.getTableName();
2019            break;
2020          }
2021        }
2022      }
2023      if (tableName == null) {
2024        future.completeExceptionally(new RestoreSnapshotException(
2025          "Unable to find the table name for snapshot=" + snapshotName));
2026        return;
2027      }
2028      final TableName finalTableName = tableName;
2029      addListener(tableExists(finalTableName), (exists, err2) -> {
2030        if (err2 != null) {
2031          future.completeExceptionally(err2);
2032        } else if (!exists) {
2033          // if table does not exist, then just clone snapshot into new table.
2034          completeConditionalOnFuture(future,
2035            internalRestoreSnapshot(snapshotName, finalTableName, restoreAcl, null));
2036        } else {
2037          addListener(isTableDisabled(finalTableName), (disabled, err4) -> {
2038            if (err4 != null) {
2039              future.completeExceptionally(err4);
2040            } else if (!disabled) {
2041              future.completeExceptionally(new TableNotDisabledException(finalTableName));
2042            } else {
2043              completeConditionalOnFuture(future,
2044                restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot, restoreAcl));
2045            }
2046          });
2047        }
2048      });
2049    });
2050    return future;
2051  }
2052
2053  private CompletableFuture<Void> restoreSnapshot(String snapshotName, TableName tableName,
2054    boolean takeFailSafeSnapshot, boolean restoreAcl) {
2055    if (takeFailSafeSnapshot) {
2056      CompletableFuture<Void> future = new CompletableFuture<>();
2057      // Step.1 Take a snapshot of the current state
2058      String failSafeSnapshotSnapshotNameFormat =
2059        this.connection.getConfiguration().get(HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME,
2060          HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME);
2061      final String failSafeSnapshotSnapshotName =
2062        failSafeSnapshotSnapshotNameFormat.replace("{snapshot.name}", snapshotName)
2063          .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.'))
2064          .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime()));
2065      LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2066      addListener(snapshot(failSafeSnapshotSnapshotName, tableName), (ret, err) -> {
2067        if (err != null) {
2068          future.completeExceptionally(err);
2069        } else {
2070          // Step.2 Restore snapshot
2071          addListener(internalRestoreSnapshot(snapshotName, tableName, restoreAcl, null),
2072            (void2, err2) -> {
2073              if (err2 != null) {
2074                // Step.3.a Something went wrong during the restore and try to rollback.
2075                addListener(internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName,
2076                  restoreAcl, null), (void3, err3) -> {
2077                    if (err3 != null) {
2078                      future.completeExceptionally(err3);
2079                    } else {
2080                      String msg =
2081                        "Restore snapshot=" + snapshotName + " failed. Rollback to snapshot="
2082                          + failSafeSnapshotSnapshotName + " succeeded.";
2083                      future.completeExceptionally(new RestoreSnapshotException(msg, err2));
2084                    }
2085                  });
2086              } else {
2087                // Step.3.b If the restore is succeeded, delete the pre-restore snapshot.
2088                LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2089                addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret3, err3) -> {
2090                  if (err3 != null) {
2091                    LOG.error(
2092                      "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName,
2093                      err3);
2094                  }
2095                  future.complete(ret3);
2096                });
2097              }
2098            });
2099        }
2100      });
2101      return future;
2102    } else {
2103      return internalRestoreSnapshot(snapshotName, tableName, restoreAcl, null);
2104    }
2105  }
2106
2107  private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture,
2108    CompletableFuture<T> parentFuture) {
2109    addListener(parentFuture, (res, err) -> {
2110      if (err != null) {
2111        dependentFuture.completeExceptionally(err);
2112      } else {
2113        dependentFuture.complete(res);
2114      }
2115    });
2116  }
2117
2118  @Override
2119  public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName,
2120    boolean restoreAcl, String customSFT) {
2121    CompletableFuture<Void> future = new CompletableFuture<>();
2122    addListener(tableExists(tableName), (exists, err) -> {
2123      if (err != null) {
2124        future.completeExceptionally(err);
2125      } else if (exists) {
2126        future.completeExceptionally(new TableExistsException(tableName));
2127      } else {
2128        completeConditionalOnFuture(future,
2129          internalRestoreSnapshot(snapshotName, tableName, restoreAcl, customSFT));
2130      }
2131    });
2132    return future;
2133  }
2134
2135  private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, TableName tableName,
2136    boolean restoreAcl, String customSFT) {
2137    SnapshotProtos.SnapshotDescription snapshot = SnapshotProtos.SnapshotDescription.newBuilder()
2138      .setName(snapshotName).setTable(tableName.getNameAsString()).build();
2139    try {
2140      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
2141    } catch (IllegalArgumentException e) {
2142      return failedFuture(e);
2143    }
2144    RestoreSnapshotRequest.Builder builder =
2145      RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot).setNonceGroup(ng.getNonceGroup())
2146        .setNonce(ng.newNonce()).setRestoreACL(restoreAcl);
2147    if (customSFT != null) {
2148      builder.setCustomSFT(customSFT);
2149    }
2150    return waitProcedureResult(this.<Long> newMasterCaller()
2151      .action((controller, stub) -> this.<RestoreSnapshotRequest, RestoreSnapshotResponse,
2152        Long> call(controller, stub, builder.build(),
2153          (s, c, req, done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId()))
2154      .call());
2155  }
2156
2157  @Override
2158  public CompletableFuture<List<SnapshotDescription>> listSnapshots() {
2159    return getCompletedSnapshots(null);
2160  }
2161
2162  @Override
2163  public CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern) {
2164    Preconditions.checkNotNull(pattern,
2165      "pattern is null. If you don't specify a pattern, use listSnapshots() instead");
2166    return getCompletedSnapshots(pattern);
2167  }
2168
2169  private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(Pattern pattern) {
2170    return this.<List<SnapshotDescription>> newMasterCaller()
2171      .action((controller, stub) -> this.<GetCompletedSnapshotsRequest,
2172        GetCompletedSnapshotsResponse, List<SnapshotDescription>> call(controller, stub,
2173          GetCompletedSnapshotsRequest.newBuilder().build(),
2174          (s, c, req, done) -> s.getCompletedSnapshots(c, req, done),
2175          resp -> ProtobufUtil.toSnapshotDescriptionList(resp, pattern)))
2176      .call();
2177  }
2178
2179  @Override
2180  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern) {
2181    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2182      + " If you don't specify a tableNamePattern, use listSnapshots() instead");
2183    return getCompletedSnapshots(tableNamePattern, null);
2184  }
2185
2186  @Override
2187  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern,
2188    Pattern snapshotNamePattern) {
2189    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2190      + " If you don't specify a tableNamePattern, use listSnapshots(Pattern) instead");
2191    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2192      + " If you don't specify a snapshotNamePattern, use listTableSnapshots(Pattern) instead");
2193    return getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2194  }
2195
2196  private CompletableFuture<List<SnapshotDescription>>
2197    getCompletedSnapshots(Pattern tableNamePattern, Pattern snapshotNamePattern) {
2198    CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>();
2199    addListener(listTableNames(tableNamePattern, false), (tableNames, err) -> {
2200      if (err != null) {
2201        future.completeExceptionally(err);
2202        return;
2203      }
2204      if (tableNames == null || tableNames.size() <= 0) {
2205        future.complete(Collections.emptyList());
2206        return;
2207      }
2208      addListener(getCompletedSnapshots(snapshotNamePattern), (snapshotDescList, err2) -> {
2209        if (err2 != null) {
2210          future.completeExceptionally(err2);
2211          return;
2212        }
2213        if (snapshotDescList == null || snapshotDescList.isEmpty()) {
2214          future.complete(Collections.emptyList());
2215          return;
2216        }
2217        future.complete(snapshotDescList.stream()
2218          .filter(snap -> (snap != null && tableNames.contains(snap.getTableName())))
2219          .collect(Collectors.toList()));
2220      });
2221    });
2222    return future;
2223  }
2224
2225  @Override
2226  public CompletableFuture<Void> deleteSnapshot(String snapshotName) {
2227    return internalDeleteSnapshot(new SnapshotDescription(snapshotName));
2228  }
2229
2230  @Override
2231  public CompletableFuture<Void> deleteSnapshots() {
2232    return internalDeleteSnapshots(null, null);
2233  }
2234
2235  @Override
2236  public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) {
2237    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2238      + " If you don't specify a snapshotNamePattern, use deleteSnapshots() instead");
2239    return internalDeleteSnapshots(null, snapshotNamePattern);
2240  }
2241
2242  @Override
2243  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern) {
2244    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2245      + " If you don't specify a tableNamePattern, use deleteSnapshots() instead");
2246    return internalDeleteSnapshots(tableNamePattern, null);
2247  }
2248
2249  @Override
2250  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern,
2251    Pattern snapshotNamePattern) {
2252    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2253      + " If you don't specify a tableNamePattern, use deleteSnapshots(Pattern) instead");
2254    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2255      + " If you don't specify a snapshotNamePattern, use deleteSnapshots(Pattern) instead");
2256    return internalDeleteSnapshots(tableNamePattern, snapshotNamePattern);
2257  }
2258
2259  private CompletableFuture<Void> internalDeleteSnapshots(Pattern tableNamePattern,
2260    Pattern snapshotNamePattern) {
2261    CompletableFuture<List<SnapshotDescription>> listSnapshotsFuture;
2262    if (tableNamePattern == null) {
2263      listSnapshotsFuture = getCompletedSnapshots(snapshotNamePattern);
2264    } else {
2265      listSnapshotsFuture = getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2266    }
2267    CompletableFuture<Void> future = new CompletableFuture<>();
2268    addListener(listSnapshotsFuture, (snapshotDescriptions, err) -> {
2269      if (err != null) {
2270        future.completeExceptionally(err);
2271        return;
2272      }
2273      if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) {
2274        future.complete(null);
2275        return;
2276      }
2277      addListener(CompletableFuture.allOf(snapshotDescriptions.stream()
2278        .map(this::internalDeleteSnapshot).toArray(CompletableFuture[]::new)), (v, e) -> {
2279          if (e != null) {
2280            future.completeExceptionally(e);
2281          } else {
2282            future.complete(v);
2283          }
2284        });
2285    });
2286    return future;
2287  }
2288
2289  private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) {
2290    return this.<Void> newMasterCaller()
2291      .action((controller, stub) -> this.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call(
2292        controller, stub,
2293        DeleteSnapshotRequest.newBuilder()
2294          .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(),
2295        (s, c, req, done) -> s.deleteSnapshot(c, req, done), resp -> null))
2296      .call();
2297  }
2298
2299  @Override
2300  public CompletableFuture<Void> execProcedure(String signature, String instance,
2301    Map<String, String> props) {
2302    CompletableFuture<Void> future = new CompletableFuture<>();
2303    ProcedureDescription procDesc =
2304      ProtobufUtil.buildProcedureDescription(signature, instance, props);
2305    addListener(this.<Long> newMasterCaller()
2306      .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call(
2307        controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(),
2308        (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout()))
2309      .call(), (expectedTimeout, err) -> {
2310        if (err != null) {
2311          future.completeExceptionally(err);
2312          return;
2313        }
2314        TimerTask pollingTask = new TimerTask() {
2315          int tries = 0;
2316          long startTime = EnvironmentEdgeManager.currentTime();
2317          long endTime = startTime + expectedTimeout;
2318          long maxPauseTime = expectedTimeout / maxAttempts;
2319
2320          @Override
2321          public void run(Timeout timeout) throws Exception {
2322            if (EnvironmentEdgeManager.currentTime() < endTime) {
2323              addListener(isProcedureFinished(signature, instance, props), (done, err2) -> {
2324                if (err2 != null) {
2325                  future.completeExceptionally(err2);
2326                  return;
2327                }
2328                if (done) {
2329                  future.complete(null);
2330                } else {
2331                  // retry again after pauseTime.
2332                  long pauseTime =
2333                    ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
2334                  pauseTime = Math.min(pauseTime, maxPauseTime);
2335                  AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
2336                    TimeUnit.MICROSECONDS);
2337                }
2338              });
2339            } else {
2340              future.completeExceptionally(new IOException("Procedure '" + signature + " : "
2341                + instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms"));
2342            }
2343          }
2344        };
2345        // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously.
2346        AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
2347      });
2348    return future;
2349  }
2350
2351  @Override
2352  public CompletableFuture<byte[]> execProcedureWithReturn(String signature, String instance,
2353    Map<String, String> props) {
2354    ProcedureDescription proDesc =
2355      ProtobufUtil.buildProcedureDescription(signature, instance, props);
2356    return this.<byte[]> newMasterCaller()
2357      .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, byte[]> call(
2358        controller, stub, ExecProcedureRequest.newBuilder().setProcedure(proDesc).build(),
2359        (s, c, req, done) -> s.execProcedureWithRet(c, req, done),
2360        resp -> resp.hasReturnData() ? resp.getReturnData().toByteArray() : null))
2361      .call();
2362  }
2363
2364  @Override
2365  public CompletableFuture<Boolean> isProcedureFinished(String signature, String instance,
2366    Map<String, String> props) {
2367    ProcedureDescription proDesc =
2368      ProtobufUtil.buildProcedureDescription(signature, instance, props);
2369    return this.<Boolean> newMasterCaller()
2370      .action(
2371        (controller, stub) -> this.<IsProcedureDoneRequest, IsProcedureDoneResponse, Boolean> call(
2372          controller, stub, IsProcedureDoneRequest.newBuilder().setProcedure(proDesc).build(),
2373          (s, c, req, done) -> s.isProcedureDone(c, req, done), resp -> resp.getDone()))
2374      .call();
2375  }
2376
2377  @Override
2378  public CompletableFuture<Boolean> abortProcedure(long procId, boolean mayInterruptIfRunning) {
2379    return this.<Boolean> newMasterCaller().action(
2380      (controller, stub) -> this.<AbortProcedureRequest, AbortProcedureResponse, Boolean> call(
2381        controller, stub, AbortProcedureRequest.newBuilder().setProcId(procId).build(),
2382        (s, c, req, done) -> s.abortProcedure(c, req, done), resp -> resp.getIsProcedureAborted()))
2383      .call();
2384  }
2385
2386  @Override
2387  public CompletableFuture<String> getProcedures() {
2388    return this.<String> newMasterCaller()
2389      .action((controller, stub) -> this.<GetProceduresRequest, GetProceduresResponse, String> call(
2390        controller, stub, GetProceduresRequest.newBuilder().build(),
2391        (s, c, req, done) -> s.getProcedures(c, req, done),
2392        resp -> ProtobufUtil.toProcedureJson(resp.getProcedureList())))
2393      .call();
2394  }
2395
2396  @Override
2397  public CompletableFuture<String> getLocks() {
2398    return this.<String> newMasterCaller()
2399      .action(
2400        (controller, stub) -> this.<GetLocksRequest, GetLocksResponse, String> call(controller,
2401          stub, GetLocksRequest.newBuilder().build(), (s, c, req, done) -> s.getLocks(c, req, done),
2402          resp -> ProtobufUtil.toLockJson(resp.getLockList())))
2403      .call();
2404  }
2405
2406  @Override
2407  public CompletableFuture<Void> decommissionRegionServers(List<ServerName> servers,
2408    boolean offload) {
2409    return this.<Void> newMasterCaller()
2410      .action((controller, stub) -> this.<DecommissionRegionServersRequest,
2411        DecommissionRegionServersResponse, Void> call(controller, stub,
2412          RequestConverter.buildDecommissionRegionServersRequest(servers, offload),
2413          (s, c, req, done) -> s.decommissionRegionServers(c, req, done), resp -> null))
2414      .call();
2415  }
2416
2417  @Override
2418  public CompletableFuture<List<ServerName>> listDecommissionedRegionServers() {
2419    return this.<List<ServerName>> newMasterCaller()
2420      .action((controller, stub) -> this.<ListDecommissionedRegionServersRequest,
2421        ListDecommissionedRegionServersResponse, List<ServerName>> call(controller, stub,
2422          ListDecommissionedRegionServersRequest.newBuilder().build(),
2423          (s, c, req, done) -> s.listDecommissionedRegionServers(c, req, done),
2424          resp -> resp.getServerNameList().stream().map(ProtobufUtil::toServerName)
2425            .collect(Collectors.toList())))
2426      .call();
2427  }
2428
2429  @Override
2430  public CompletableFuture<Void> recommissionRegionServer(ServerName server,
2431    List<byte[]> encodedRegionNames) {
2432    return this.<Void> newMasterCaller()
2433      .action((controller, stub) -> this.<RecommissionRegionServerRequest,
2434        RecommissionRegionServerResponse, Void> call(controller, stub,
2435          RequestConverter.buildRecommissionRegionServerRequest(server, encodedRegionNames),
2436          (s, c, req, done) -> s.recommissionRegionServer(c, req, done), resp -> null))
2437      .call();
2438  }
2439
2440  /**
2441   * Get the region location for the passed region name. The region name may be a full region name
2442   * or encoded region name. If the region does not found, then it'll throw an
2443   * UnknownRegionException wrapped by a {@link CompletableFuture}
2444   * @param regionNameOrEncodedRegionName region name or encoded region name
2445   * @return region location, wrapped by a {@link CompletableFuture}
2446   */
2447  CompletableFuture<HRegionLocation> getRegionLocation(byte[] regionNameOrEncodedRegionName) {
2448    if (regionNameOrEncodedRegionName == null) {
2449      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2450    }
2451
2452    CompletableFuture<Optional<HRegionLocation>> future;
2453    if (RegionInfo.isEncodedRegionName(regionNameOrEncodedRegionName)) {
2454      String encodedName = Bytes.toString(regionNameOrEncodedRegionName);
2455      if (encodedName.length() < RegionInfo.MD5_HEX_LENGTH) {
2456        // old format encodedName, should be meta region
2457        future = connection.registry.getMetaRegionLocations()
2458          .thenApply(locs -> Stream.of(locs.getRegionLocations())
2459            .filter(loc -> loc.getRegion().getEncodedName().equals(encodedName)).findFirst());
2460      } else {
2461        future = ClientMetaTableAccessor.getRegionLocationWithEncodedName(metaTable,
2462          regionNameOrEncodedRegionName);
2463      }
2464    } else {
2465      // Not all regionNameOrEncodedRegionName here is going to be a valid region name,
2466      // it needs to throw out IllegalArgumentException in case tableName is passed in.
2467      RegionInfo regionInfo;
2468      try {
2469        regionInfo =
2470          CatalogFamilyFormat.parseRegionInfoFromRegionName(regionNameOrEncodedRegionName);
2471      } catch (IOException ioe) {
2472        return failedFuture(new IllegalArgumentException(ioe.getMessage()));
2473      }
2474
2475      if (regionInfo.isMetaRegion()) {
2476        future = connection.registry.getMetaRegionLocations()
2477          .thenApply(locs -> Stream.of(locs.getRegionLocations())
2478            .filter(loc -> loc.getRegion().getReplicaId() == regionInfo.getReplicaId())
2479            .findFirst());
2480      } else {
2481        future =
2482          ClientMetaTableAccessor.getRegionLocation(metaTable, regionNameOrEncodedRegionName);
2483      }
2484    }
2485
2486    CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>();
2487    addListener(future, (location, err) -> {
2488      if (err != null) {
2489        returnedFuture.completeExceptionally(err);
2490        return;
2491      }
2492      if (!location.isPresent() || location.get().getRegion() == null) {
2493        returnedFuture.completeExceptionally(
2494          new UnknownRegionException("Invalid region name or encoded region name: "
2495            + Bytes.toStringBinary(regionNameOrEncodedRegionName)));
2496      } else {
2497        returnedFuture.complete(location.get());
2498      }
2499    });
2500    return returnedFuture;
2501  }
2502
2503  /**
2504   * Get the region info for the passed region name. The region name may be a full region name or
2505   * encoded region name. If the region does not found, then it'll throw an UnknownRegionException
2506   * wrapped by a {@link CompletableFuture}
2507   * @return region info, wrapped by a {@link CompletableFuture}
2508   */
2509  private CompletableFuture<RegionInfo> getRegionInfo(byte[] regionNameOrEncodedRegionName) {
2510    if (regionNameOrEncodedRegionName == null) {
2511      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2512    }
2513
2514    if (
2515      Bytes.equals(regionNameOrEncodedRegionName,
2516        RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName())
2517        || Bytes.equals(regionNameOrEncodedRegionName,
2518          RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes())
2519    ) {
2520      return CompletableFuture.completedFuture(RegionInfoBuilder.FIRST_META_REGIONINFO);
2521    }
2522
2523    CompletableFuture<RegionInfo> future = new CompletableFuture<>();
2524    addListener(getRegionLocation(regionNameOrEncodedRegionName), (location, err) -> {
2525      if (err != null) {
2526        future.completeExceptionally(err);
2527      } else {
2528        future.complete(location.getRegion());
2529      }
2530    });
2531    return future;
2532  }
2533
2534  private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) {
2535    if (numRegions < 3) {
2536      throw new IllegalArgumentException("Must create at least three regions");
2537    } else if (Bytes.compareTo(startKey, endKey) >= 0) {
2538      throw new IllegalArgumentException("Start key must be smaller than end key");
2539    }
2540    if (numRegions == 3) {
2541      return new byte[][] { startKey, endKey };
2542    }
2543    byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
2544    if (splitKeys == null || splitKeys.length != numRegions - 1) {
2545      throw new IllegalArgumentException("Unable to split key range into enough regions");
2546    }
2547    return splitKeys;
2548  }
2549
2550  private void verifySplitKeys(byte[][] splitKeys) {
2551    Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR);
2552    // Verify there are no duplicate split keys
2553    byte[] lastKey = null;
2554    for (byte[] splitKey : splitKeys) {
2555      if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) {
2556        throw new IllegalArgumentException("Empty split key must not be passed in the split keys.");
2557      }
2558      if (lastKey != null && Bytes.equals(splitKey, lastKey)) {
2559        throw new IllegalArgumentException("All split keys must be unique, " + "found duplicate: "
2560          + Bytes.toStringBinary(splitKey) + ", " + Bytes.toStringBinary(lastKey));
2561      }
2562      lastKey = splitKey;
2563    }
2564  }
2565
2566  private static abstract class ProcedureBiConsumer implements BiConsumer<Void, Throwable> {
2567
2568    abstract void onFinished();
2569
2570    abstract void onError(Throwable error);
2571
2572    @Override
2573    public void accept(Void v, Throwable error) {
2574      if (error != null) {
2575        onError(error);
2576        return;
2577      }
2578      onFinished();
2579    }
2580  }
2581
2582  private static abstract class TableProcedureBiConsumer extends ProcedureBiConsumer {
2583    protected final TableName tableName;
2584
2585    TableProcedureBiConsumer(TableName tableName) {
2586      this.tableName = tableName;
2587    }
2588
2589    abstract String getOperationType();
2590
2591    String getDescription() {
2592      return "Operation: " + getOperationType() + ", " + "Table Name: "
2593        + tableName.getNameWithNamespaceInclAsString();
2594    }
2595
2596    @Override
2597    void onFinished() {
2598      LOG.info(getDescription() + " completed");
2599    }
2600
2601    @Override
2602    void onError(Throwable error) {
2603      LOG.info(getDescription() + " failed with " + error.getMessage());
2604    }
2605  }
2606
2607  private static abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer {
2608    protected final String namespaceName;
2609
2610    NamespaceProcedureBiConsumer(String namespaceName) {
2611      this.namespaceName = namespaceName;
2612    }
2613
2614    abstract String getOperationType();
2615
2616    String getDescription() {
2617      return "Operation: " + getOperationType() + ", Namespace: " + namespaceName;
2618    }
2619
2620    @Override
2621    void onFinished() {
2622      LOG.info(getDescription() + " completed");
2623    }
2624
2625    @Override
2626    void onError(Throwable error) {
2627      LOG.info(getDescription() + " failed with " + error.getMessage());
2628    }
2629  }
2630
2631  private static class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2632
2633    CreateTableProcedureBiConsumer(TableName tableName) {
2634      super(tableName);
2635    }
2636
2637    @Override
2638    String getOperationType() {
2639      return "CREATE";
2640    }
2641  }
2642
2643  private static class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer {
2644
2645    ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
2646      super(tableName);
2647    }
2648
2649    @Override
2650    String getOperationType() {
2651      return "MODIFY";
2652    }
2653  }
2654
2655  private static class ModifyTableStoreFileTrackerProcedureBiConsumer
2656    extends TableProcedureBiConsumer {
2657
2658    ModifyTableStoreFileTrackerProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
2659      super(tableName);
2660    }
2661
2662    @Override
2663    String getOperationType() {
2664      return "MODIFY_TABLE_STORE_FILE_TRACKER";
2665    }
2666  }
2667
2668  private class DeleteTableProcedureBiConsumer extends TableProcedureBiConsumer {
2669
2670    DeleteTableProcedureBiConsumer(TableName tableName) {
2671      super(tableName);
2672    }
2673
2674    @Override
2675    String getOperationType() {
2676      return "DELETE";
2677    }
2678
2679    @Override
2680    void onFinished() {
2681      connection.getLocator().clearCache(this.tableName);
2682      super.onFinished();
2683    }
2684  }
2685
2686  private static class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2687
2688    TruncateTableProcedureBiConsumer(TableName tableName) {
2689      super(tableName);
2690    }
2691
2692    @Override
2693    String getOperationType() {
2694      return "TRUNCATE";
2695    }
2696  }
2697
2698  private static class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2699
2700    EnableTableProcedureBiConsumer(TableName tableName) {
2701      super(tableName);
2702    }
2703
2704    @Override
2705    String getOperationType() {
2706      return "ENABLE";
2707    }
2708  }
2709
2710  private static class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2711
2712    DisableTableProcedureBiConsumer(TableName tableName) {
2713      super(tableName);
2714    }
2715
2716    @Override
2717    String getOperationType() {
2718      return "DISABLE";
2719    }
2720  }
2721
2722  private static class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2723
2724    AddColumnFamilyProcedureBiConsumer(TableName tableName) {
2725      super(tableName);
2726    }
2727
2728    @Override
2729    String getOperationType() {
2730      return "ADD_COLUMN_FAMILY";
2731    }
2732  }
2733
2734  private static class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2735
2736    DeleteColumnFamilyProcedureBiConsumer(TableName tableName) {
2737      super(tableName);
2738    }
2739
2740    @Override
2741    String getOperationType() {
2742      return "DELETE_COLUMN_FAMILY";
2743    }
2744  }
2745
2746  private static class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2747
2748    ModifyColumnFamilyProcedureBiConsumer(TableName tableName) {
2749      super(tableName);
2750    }
2751
2752    @Override
2753    String getOperationType() {
2754      return "MODIFY_COLUMN_FAMILY";
2755    }
2756  }
2757
2758  private static class ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer
2759    extends TableProcedureBiConsumer {
2760
2761    ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer(TableName tableName) {
2762      super(tableName);
2763    }
2764
2765    @Override
2766    String getOperationType() {
2767      return "MODIFY_COLUMN_FAMILY_STORE_FILE_TRACKER";
2768    }
2769  }
2770
2771  private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2772
2773    CreateNamespaceProcedureBiConsumer(String namespaceName) {
2774      super(namespaceName);
2775    }
2776
2777    @Override
2778    String getOperationType() {
2779      return "CREATE_NAMESPACE";
2780    }
2781  }
2782
2783  private static class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2784
2785    DeleteNamespaceProcedureBiConsumer(String namespaceName) {
2786      super(namespaceName);
2787    }
2788
2789    @Override
2790    String getOperationType() {
2791      return "DELETE_NAMESPACE";
2792    }
2793  }
2794
2795  private static class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2796
2797    ModifyNamespaceProcedureBiConsumer(String namespaceName) {
2798      super(namespaceName);
2799    }
2800
2801    @Override
2802    String getOperationType() {
2803      return "MODIFY_NAMESPACE";
2804    }
2805  }
2806
2807  private static class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2808
2809    MergeTableRegionProcedureBiConsumer(TableName tableName) {
2810      super(tableName);
2811    }
2812
2813    @Override
2814    String getOperationType() {
2815      return "MERGE_REGIONS";
2816    }
2817  }
2818
2819  private static class SplitTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2820
2821    SplitTableRegionProcedureBiConsumer(TableName tableName) {
2822      super(tableName);
2823    }
2824
2825    @Override
2826    String getOperationType() {
2827      return "SPLIT_REGION";
2828    }
2829  }
2830
2831  private static class SnapshotProcedureBiConsumer extends TableProcedureBiConsumer {
2832    SnapshotProcedureBiConsumer(TableName tableName) {
2833      super(tableName);
2834    }
2835
2836    @Override
2837    String getOperationType() {
2838      return "SNAPSHOT";
2839    }
2840  }
2841
2842  private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer {
2843    private final String peerId;
2844    private final Supplier<String> getOperation;
2845
2846    ReplicationProcedureBiConsumer(String peerId, Supplier<String> getOperation) {
2847      this.peerId = peerId;
2848      this.getOperation = getOperation;
2849    }
2850
2851    String getDescription() {
2852      return "Operation: " + getOperation.get() + ", peerId: " + peerId;
2853    }
2854
2855    @Override
2856    void onFinished() {
2857      LOG.info(getDescription() + " completed");
2858    }
2859
2860    @Override
2861    void onError(Throwable error) {
2862      LOG.info(getDescription() + " failed with " + error.getMessage());
2863    }
2864  }
2865
2866  private CompletableFuture<Void> waitProcedureResult(CompletableFuture<Long> procFuture) {
2867    CompletableFuture<Void> future = new CompletableFuture<>();
2868    addListener(procFuture, (procId, error) -> {
2869      if (error != null) {
2870        future.completeExceptionally(error);
2871        return;
2872      }
2873      getProcedureResult(procId, future, 0);
2874    });
2875    return future;
2876  }
2877
2878  private void getProcedureResult(long procId, CompletableFuture<Void> future, int retries) {
2879    addListener(
2880      this.<GetProcedureResultResponse> newMasterCaller()
2881        .action((controller, stub) -> this.<GetProcedureResultRequest, GetProcedureResultResponse,
2882          GetProcedureResultResponse> call(controller, stub,
2883            GetProcedureResultRequest.newBuilder().setProcId(procId).build(),
2884            (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp))
2885        .call(),
2886      (response, error) -> {
2887        if (error != null) {
2888          LOG.warn("failed to get the procedure result procId={}", procId,
2889            ConnectionUtils.translateException(error));
2890          retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
2891            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
2892          return;
2893        }
2894        if (response.getState() == GetProcedureResultResponse.State.RUNNING) {
2895          retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
2896            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
2897          return;
2898        }
2899        if (response.hasException()) {
2900          IOException ioe = ForeignExceptionUtil.toIOException(response.getException());
2901          future.completeExceptionally(ioe);
2902        } else {
2903          future.complete(null);
2904        }
2905      });
2906  }
2907
2908  private <T> CompletableFuture<T> failedFuture(Throwable error) {
2909    CompletableFuture<T> future = new CompletableFuture<>();
2910    future.completeExceptionally(error);
2911    return future;
2912  }
2913
2914  private <T> boolean completeExceptionally(CompletableFuture<T> future, Throwable error) {
2915    if (error != null) {
2916      future.completeExceptionally(error);
2917      return true;
2918    }
2919    return false;
2920  }
2921
2922  @Override
2923  public CompletableFuture<ClusterMetrics> getClusterMetrics() {
2924    return getClusterMetrics(EnumSet.allOf(Option.class));
2925  }
2926
2927  @Override
2928  public CompletableFuture<ClusterMetrics> getClusterMetrics(EnumSet<Option> options) {
2929    return this.<ClusterMetrics> newMasterCaller()
2930      .action((controller, stub) -> this.<GetClusterStatusRequest, GetClusterStatusResponse,
2931        ClusterMetrics> call(controller, stub,
2932          RequestConverter.buildGetClusterStatusRequest(options),
2933          (s, c, req, done) -> s.getClusterStatus(c, req, done),
2934          resp -> ClusterMetricsBuilder.toClusterMetrics(resp.getClusterStatus())))
2935      .call();
2936  }
2937
2938  @Override
2939  public CompletableFuture<Void> shutdown() {
2940    return this.<Void> newMasterCaller().priority(HIGH_QOS)
2941      .action((controller, stub) -> this.<ShutdownRequest, ShutdownResponse, Void> call(controller,
2942        stub, ShutdownRequest.newBuilder().build(), (s, c, req, done) -> s.shutdown(c, req, done),
2943        resp -> null))
2944      .call();
2945  }
2946
2947  @Override
2948  public CompletableFuture<Void> stopMaster() {
2949    return this.<Void> newMasterCaller().priority(HIGH_QOS)
2950      .action((controller, stub) -> this.<StopMasterRequest, StopMasterResponse, Void> call(
2951        controller, stub, StopMasterRequest.newBuilder().build(),
2952        (s, c, req, done) -> s.stopMaster(c, req, done), resp -> null))
2953      .call();
2954  }
2955
2956  @Override
2957  public CompletableFuture<Void> stopRegionServer(ServerName serverName) {
2958    StopServerRequest request = RequestConverter
2959      .buildStopServerRequest("Called by admin client " + this.connection.toString());
2960    return this.<Void> newAdminCaller().priority(HIGH_QOS)
2961      .action((controller, stub) -> this.<StopServerRequest, StopServerResponse, Void> adminCall(
2962        controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done),
2963        resp -> null))
2964      .serverName(serverName).call();
2965  }
2966
2967  @Override
2968  public CompletableFuture<Void> updateConfiguration(ServerName serverName) {
2969    return this.<Void> newAdminCaller()
2970      .action((controller, stub) -> this.<UpdateConfigurationRequest, UpdateConfigurationResponse,
2971        Void> adminCall(controller, stub, UpdateConfigurationRequest.getDefaultInstance(),
2972          (s, c, req, done) -> s.updateConfiguration(controller, req, done), resp -> null))
2973      .serverName(serverName).call();
2974  }
2975
2976  @Override
2977  public CompletableFuture<Void> updateConfiguration() {
2978    CompletableFuture<Void> future = new CompletableFuture<Void>();
2979    addListener(
2980      getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.MASTER, Option.BACKUP_MASTERS)),
2981      (status, err) -> {
2982        if (err != null) {
2983          future.completeExceptionally(err);
2984        } else {
2985          List<CompletableFuture<Void>> futures = new ArrayList<>();
2986          status.getServersName().forEach(server -> futures.add(updateConfiguration(server)));
2987          futures.add(updateConfiguration(status.getMasterName()));
2988          status.getBackupMasterNames().forEach(master -> futures.add(updateConfiguration(master)));
2989          addListener(
2990            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
2991            (result, err2) -> {
2992              if (err2 != null) {
2993                future.completeExceptionally(err2);
2994              } else {
2995                future.complete(result);
2996              }
2997            });
2998        }
2999      });
3000    return future;
3001  }
3002
3003  @Override
3004  public CompletableFuture<Void> updateConfiguration(String groupName) {
3005    CompletableFuture<Void> future = new CompletableFuture<Void>();
3006    addListener(getRSGroup(groupName), (rsGroupInfo, err) -> {
3007      if (err != null) {
3008        future.completeExceptionally(err);
3009      } else if (rsGroupInfo == null) {
3010        future.completeExceptionally(
3011          new IllegalArgumentException("Group does not exist: " + groupName));
3012      } else {
3013        addListener(getClusterMetrics(EnumSet.of(Option.SERVERS_NAME)), (status, err2) -> {
3014          if (err2 != null) {
3015            future.completeExceptionally(err2);
3016          } else {
3017            List<CompletableFuture<Void>> futures = new ArrayList<>();
3018            List<ServerName> groupServers = status.getServersName().stream()
3019              .filter(s -> rsGroupInfo.containsServer(s.getAddress())).collect(Collectors.toList());
3020            groupServers.forEach(server -> futures.add(updateConfiguration(server)));
3021            addListener(
3022              CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3023              (result, err3) -> {
3024                if (err3 != null) {
3025                  future.completeExceptionally(err3);
3026                } else {
3027                  future.complete(result);
3028                }
3029              });
3030          }
3031        });
3032      }
3033    });
3034    return future;
3035  }
3036
3037  @Override
3038  public CompletableFuture<Void> rollWALWriter(ServerName serverName) {
3039    return this.<Void> newAdminCaller()
3040      .action((controller, stub) -> this.<RollWALWriterRequest, RollWALWriterResponse,
3041        Void> adminCall(controller, stub, RequestConverter.buildRollWALWriterRequest(),
3042          (s, c, req, done) -> s.rollWALWriter(controller, req, done), resp -> null))
3043      .serverName(serverName).call();
3044  }
3045
3046  @Override
3047  public CompletableFuture<Void> clearCompactionQueues(ServerName serverName, Set<String> queues) {
3048    return this.<Void> newAdminCaller()
3049      .action((controller, stub) -> this.<ClearCompactionQueuesRequest,
3050        ClearCompactionQueuesResponse, Void> adminCall(controller, stub,
3051          RequestConverter.buildClearCompactionQueuesRequest(queues),
3052          (s, c, req, done) -> s.clearCompactionQueues(controller, req, done), resp -> null))
3053      .serverName(serverName).call();
3054  }
3055
3056  @Override
3057  public CompletableFuture<List<SecurityCapability>> getSecurityCapabilities() {
3058    return this.<List<SecurityCapability>> newMasterCaller()
3059      .action((controller, stub) -> this.<SecurityCapabilitiesRequest, SecurityCapabilitiesResponse,
3060        List<SecurityCapability>> call(controller, stub,
3061          SecurityCapabilitiesRequest.newBuilder().build(),
3062          (s, c, req, done) -> s.getSecurityCapabilities(c, req, done),
3063          (resp) -> ProtobufUtil.toSecurityCapabilityList(resp.getCapabilitiesList())))
3064      .call();
3065  }
3066
3067  @Override
3068  public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName) {
3069    return getRegionMetrics(GetRegionLoadRequest.newBuilder().build(), serverName);
3070  }
3071
3072  @Override
3073  public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName,
3074    TableName tableName) {
3075    Preconditions.checkNotNull(tableName,
3076      "tableName is null. If you don't specify a tableName, use getRegionLoads() instead");
3077    return getRegionMetrics(RequestConverter.buildGetRegionLoadRequest(tableName), serverName);
3078  }
3079
3080  private CompletableFuture<List<RegionMetrics>> getRegionMetrics(GetRegionLoadRequest request,
3081    ServerName serverName) {
3082    return this.<List<RegionMetrics>> newAdminCaller()
3083      .action((controller, stub) -> this.<GetRegionLoadRequest, GetRegionLoadResponse,
3084        List<RegionMetrics>> adminCall(controller, stub, request,
3085          (s, c, req, done) -> s.getRegionLoad(controller, req, done),
3086          RegionMetricsBuilder::toRegionMetrics))
3087      .serverName(serverName).call();
3088  }
3089
3090  @Override
3091  public CompletableFuture<Boolean> isMasterInMaintenanceMode() {
3092    return this.<Boolean> newMasterCaller()
3093      .action((controller, stub) -> this.<IsInMaintenanceModeRequest, IsInMaintenanceModeResponse,
3094        Boolean> call(controller, stub, IsInMaintenanceModeRequest.newBuilder().build(),
3095          (s, c, req, done) -> s.isMasterInMaintenanceMode(c, req, done),
3096          resp -> resp.getInMaintenanceMode()))
3097      .call();
3098  }
3099
3100  @Override
3101  public CompletableFuture<CompactionState> getCompactionState(TableName tableName,
3102    CompactType compactType) {
3103    CompletableFuture<CompactionState> future = new CompletableFuture<>();
3104
3105    switch (compactType) {
3106      case MOB:
3107        addListener(connection.registry.getActiveMaster(), (serverName, err) -> {
3108          if (err != null) {
3109            future.completeExceptionally(err);
3110            return;
3111          }
3112          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
3113
3114          addListener(this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName)
3115            .action((controller, stub) -> this.<GetRegionInfoRequest, GetRegionInfoResponse,
3116              GetRegionInfoResponse> adminCall(controller, stub,
3117                RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true),
3118                (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
3119            .call(), (resp2, err2) -> {
3120              if (err2 != null) {
3121                future.completeExceptionally(err2);
3122              } else {
3123                if (resp2.hasCompactionState()) {
3124                  future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
3125                } else {
3126                  future.complete(CompactionState.NONE);
3127                }
3128              }
3129            });
3130        });
3131        break;
3132      case NORMAL:
3133        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
3134          if (err != null) {
3135            future.completeExceptionally(err);
3136            return;
3137          }
3138          ConcurrentLinkedQueue<CompactionState> regionStates = new ConcurrentLinkedQueue<>();
3139          List<CompletableFuture<CompactionState>> futures = new ArrayList<>();
3140          locations.stream().filter(loc -> loc.getServerName() != null)
3141            .filter(loc -> loc.getRegion() != null).filter(loc -> !loc.getRegion().isOffline())
3142            .map(loc -> loc.getRegion().getRegionName()).forEach(region -> {
3143              futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> {
3144                // If any region compaction state is MAJOR_AND_MINOR
3145                // the table compaction state is MAJOR_AND_MINOR, too.
3146                if (err2 != null) {
3147                  future.completeExceptionally(unwrapCompletionException(err2));
3148                } else if (regionState == CompactionState.MAJOR_AND_MINOR) {
3149                  future.complete(regionState);
3150                } else {
3151                  regionStates.add(regionState);
3152                }
3153              }));
3154            });
3155          addListener(
3156            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3157            (ret, err3) -> {
3158              // If future not completed, check all regions's compaction state
3159              if (!future.isCompletedExceptionally() && !future.isDone()) {
3160                CompactionState state = CompactionState.NONE;
3161                for (CompactionState regionState : regionStates) {
3162                  switch (regionState) {
3163                    case MAJOR:
3164                      if (state == CompactionState.MINOR) {
3165                        future.complete(CompactionState.MAJOR_AND_MINOR);
3166                      } else {
3167                        state = CompactionState.MAJOR;
3168                      }
3169                      break;
3170                    case MINOR:
3171                      if (state == CompactionState.MAJOR) {
3172                        future.complete(CompactionState.MAJOR_AND_MINOR);
3173                      } else {
3174                        state = CompactionState.MINOR;
3175                      }
3176                      break;
3177                    case NONE:
3178                    default:
3179                  }
3180                }
3181                if (!future.isDone()) {
3182                  future.complete(state);
3183                }
3184              }
3185            });
3186        });
3187        break;
3188      default:
3189        throw new IllegalArgumentException("Unknown compactType: " + compactType);
3190    }
3191
3192    return future;
3193  }
3194
3195  @Override
3196  public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) {
3197    CompletableFuture<CompactionState> future = new CompletableFuture<>();
3198    addListener(getRegionLocation(regionName), (location, err) -> {
3199      if (err != null) {
3200        future.completeExceptionally(err);
3201        return;
3202      }
3203      ServerName serverName = location.getServerName();
3204      if (serverName == null) {
3205        future
3206          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
3207        return;
3208      }
3209      addListener(
3210        this.<GetRegionInfoResponse> newAdminCaller()
3211          .action((controller, stub) -> this.<GetRegionInfoRequest, GetRegionInfoResponse,
3212            GetRegionInfoResponse> adminCall(controller, stub,
3213              RequestConverter.buildGetRegionInfoRequest(location.getRegion().getRegionName(),
3214                true),
3215              (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
3216          .serverName(serverName).call(),
3217        (resp2, err2) -> {
3218          if (err2 != null) {
3219            future.completeExceptionally(err2);
3220          } else {
3221            if (resp2.hasCompactionState()) {
3222              future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
3223            } else {
3224              future.complete(CompactionState.NONE);
3225            }
3226          }
3227        });
3228    });
3229    return future;
3230  }
3231
3232  @Override
3233  public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName) {
3234    MajorCompactionTimestampRequest request = MajorCompactionTimestampRequest.newBuilder()
3235      .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
3236    return this.<Optional<Long>> newMasterCaller()
3237      .action((controller, stub) -> this.<MajorCompactionTimestampRequest,
3238        MajorCompactionTimestampResponse, Optional<Long>> call(controller, stub, request,
3239          (s, c, req, done) -> s.getLastMajorCompactionTimestamp(c, req, done),
3240          ProtobufUtil::toOptionalTimestamp))
3241      .call();
3242  }
3243
3244  @Override
3245  public CompletableFuture<Optional<Long>>
3246    getLastMajorCompactionTimestampForRegion(byte[] regionName) {
3247    CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
3248    // regionName may be a full region name or encoded region name, so getRegionInfo(byte[]) first
3249    addListener(getRegionInfo(regionName), (region, err) -> {
3250      if (err != null) {
3251        future.completeExceptionally(err);
3252        return;
3253      }
3254      MajorCompactionTimestampForRegionRequest.Builder builder =
3255        MajorCompactionTimestampForRegionRequest.newBuilder();
3256      builder.setRegion(
3257        RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName));
3258      addListener(this.<Optional<Long>> newMasterCaller()
3259        .action((controller, stub) -> this.<MajorCompactionTimestampForRegionRequest,
3260          MajorCompactionTimestampResponse, Optional<Long>> call(controller, stub, builder.build(),
3261            (s, c, req, done) -> s.getLastMajorCompactionTimestampForRegion(c, req, done),
3262            ProtobufUtil::toOptionalTimestamp))
3263        .call(), (timestamp, err2) -> {
3264          if (err2 != null) {
3265            future.completeExceptionally(err2);
3266          } else {
3267            future.complete(timestamp);
3268          }
3269        });
3270    });
3271    return future;
3272  }
3273
3274  @Override
3275  public CompletableFuture<Map<ServerName, Boolean>> compactionSwitch(boolean switchState,
3276    List<String> serverNamesList) {
3277    CompletableFuture<Map<ServerName, Boolean>> future = new CompletableFuture<>();
3278    addListener(getRegionServerList(serverNamesList), (serverNames, err) -> {
3279      if (err != null) {
3280        future.completeExceptionally(err);
3281        return;
3282      }
3283      // Accessed by multiple threads.
3284      Map<ServerName, Boolean> serverStates = new ConcurrentHashMap<>(serverNames.size());
3285      List<CompletableFuture<Boolean>> futures = new ArrayList<>(serverNames.size());
3286      serverNames.stream().forEach(serverName -> {
3287        futures.add(switchCompact(serverName, switchState).whenComplete((serverState, err2) -> {
3288          if (err2 != null) {
3289            future.completeExceptionally(unwrapCompletionException(err2));
3290          } else {
3291            serverStates.put(serverName, serverState);
3292          }
3293        }));
3294      });
3295      addListener(
3296        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3297        (ret, err3) -> {
3298          if (!future.isCompletedExceptionally()) {
3299            if (err3 != null) {
3300              future.completeExceptionally(err3);
3301            } else {
3302              future.complete(serverStates);
3303            }
3304          }
3305        });
3306    });
3307    return future;
3308  }
3309
3310  private CompletableFuture<List<ServerName>> getRegionServerList(List<String> serverNamesList) {
3311    CompletableFuture<List<ServerName>> future = new CompletableFuture<>();
3312    if (serverNamesList.isEmpty()) {
3313      CompletableFuture<ClusterMetrics> clusterMetricsCompletableFuture =
3314        getClusterMetrics(EnumSet.of(Option.SERVERS_NAME));
3315      addListener(clusterMetricsCompletableFuture, (clusterMetrics, err) -> {
3316        if (err != null) {
3317          future.completeExceptionally(err);
3318        } else {
3319          future.complete(clusterMetrics.getServersName());
3320        }
3321      });
3322      return future;
3323    } else {
3324      List<ServerName> serverList = new ArrayList<>();
3325      for (String regionServerName : serverNamesList) {
3326        ServerName serverName = null;
3327        try {
3328          serverName = ServerName.valueOf(regionServerName);
3329        } catch (Exception e) {
3330          future.completeExceptionally(
3331            new IllegalArgumentException(String.format("ServerName format: %s", regionServerName)));
3332        }
3333        if (serverName == null) {
3334          future.completeExceptionally(
3335            new IllegalArgumentException(String.format("Null ServerName: %s", regionServerName)));
3336        } else {
3337          serverList.add(serverName);
3338        }
3339      }
3340      future.complete(serverList);
3341    }
3342    return future;
3343  }
3344
3345  private CompletableFuture<Boolean> switchCompact(ServerName serverName, boolean onOrOff) {
3346    return this.<Boolean> newAdminCaller().serverName(serverName)
3347      .action((controller, stub) -> this.<CompactionSwitchRequest, CompactionSwitchResponse,
3348        Boolean> adminCall(controller, stub,
3349          CompactionSwitchRequest.newBuilder().setEnabled(onOrOff).build(),
3350          (s, c, req, done) -> s.compactionSwitch(c, req, done), resp -> resp.getPrevState()))
3351      .call();
3352  }
3353
3354  @Override
3355  public CompletableFuture<Boolean> balancerSwitch(boolean on, boolean drainRITs) {
3356    return this.<Boolean> newMasterCaller()
3357      .action((controller, stub) -> this.<SetBalancerRunningRequest, SetBalancerRunningResponse,
3358        Boolean> call(controller, stub,
3359          RequestConverter.buildSetBalancerRunningRequest(on, drainRITs),
3360          (s, c, req, done) -> s.setBalancerRunning(c, req, done),
3361          (resp) -> resp.getPrevBalanceValue()))
3362      .call();
3363  }
3364
3365  @Override
3366  public CompletableFuture<BalanceResponse> balance(BalanceRequest request) {
3367    return this.<BalanceResponse> newMasterCaller()
3368      .action((controller, stub) -> this.<MasterProtos.BalanceRequest, MasterProtos.BalanceResponse,
3369        BalanceResponse> call(controller, stub, ProtobufUtil.toBalanceRequest(request),
3370          (s, c, req, done) -> s.balance(c, req, done),
3371          (resp) -> ProtobufUtil.toBalanceResponse(resp)))
3372      .call();
3373  }
3374
3375  @Override
3376  public CompletableFuture<Boolean> isBalancerEnabled() {
3377    return this.<Boolean> newMasterCaller()
3378      .action((controller, stub) -> this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse,
3379        Boolean> call(controller, stub, RequestConverter.buildIsBalancerEnabledRequest(),
3380          (s, c, req, done) -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled()))
3381      .call();
3382  }
3383
3384  @Override
3385  public CompletableFuture<Boolean> normalizerSwitch(boolean on) {
3386    return this.<Boolean> newMasterCaller()
3387      .action((controller, stub) -> this.<SetNormalizerRunningRequest, SetNormalizerRunningResponse,
3388        Boolean> call(controller, stub, RequestConverter.buildSetNormalizerRunningRequest(on),
3389          (s, c, req, done) -> s.setNormalizerRunning(c, req, done),
3390          (resp) -> resp.getPrevNormalizerValue()))
3391      .call();
3392  }
3393
3394  @Override
3395  public CompletableFuture<Boolean> isNormalizerEnabled() {
3396    return this.<Boolean> newMasterCaller()
3397      .action((controller, stub) -> this.<IsNormalizerEnabledRequest, IsNormalizerEnabledResponse,
3398        Boolean> call(controller, stub, RequestConverter.buildIsNormalizerEnabledRequest(),
3399          (s, c, req, done) -> s.isNormalizerEnabled(c, req, done), (resp) -> resp.getEnabled()))
3400      .call();
3401  }
3402
3403  @Override
3404  public CompletableFuture<Boolean> normalize(NormalizeTableFilterParams ntfp) {
3405    return normalize(RequestConverter.buildNormalizeRequest(ntfp));
3406  }
3407
3408  private CompletableFuture<Boolean> normalize(NormalizeRequest request) {
3409    return this.<Boolean> newMasterCaller().action((controller, stub) -> this.call(controller, stub,
3410      request, MasterService.Interface::normalize, NormalizeResponse::getNormalizerRan)).call();
3411  }
3412
3413  @Override
3414  public CompletableFuture<Boolean> cleanerChoreSwitch(boolean enabled) {
3415    return this.<Boolean> newMasterCaller()
3416      .action((controller, stub) -> this.<SetCleanerChoreRunningRequest,
3417        SetCleanerChoreRunningResponse, Boolean> call(controller, stub,
3418          RequestConverter.buildSetCleanerChoreRunningRequest(enabled),
3419          (s, c, req, done) -> s.setCleanerChoreRunning(c, req, done),
3420          (resp) -> resp.getPrevValue()))
3421      .call();
3422  }
3423
3424  @Override
3425  public CompletableFuture<Boolean> isCleanerChoreEnabled() {
3426    return this.<Boolean> newMasterCaller()
3427      .action(
3428        (controller, stub) -> this.<IsCleanerChoreEnabledRequest, IsCleanerChoreEnabledResponse,
3429          Boolean> call(controller, stub, RequestConverter.buildIsCleanerChoreEnabledRequest(),
3430            (s, c, req, done) -> s.isCleanerChoreEnabled(c, req, done), (resp) -> resp.getValue()))
3431      .call();
3432  }
3433
3434  @Override
3435  public CompletableFuture<Boolean> runCleanerChore() {
3436    return this.<Boolean> newMasterCaller()
3437      .action((controller, stub) -> this.<RunCleanerChoreRequest, RunCleanerChoreResponse,
3438        Boolean> call(controller, stub, RequestConverter.buildRunCleanerChoreRequest(),
3439          (s, c, req, done) -> s.runCleanerChore(c, req, done),
3440          (resp) -> resp.getCleanerChoreRan()))
3441      .call();
3442  }
3443
3444  @Override
3445  public CompletableFuture<Boolean> catalogJanitorSwitch(boolean enabled) {
3446    return this.<Boolean> newMasterCaller()
3447      .action((controller, stub) -> this.<EnableCatalogJanitorRequest, EnableCatalogJanitorResponse,
3448        Boolean> call(controller, stub, RequestConverter.buildEnableCatalogJanitorRequest(enabled),
3449          (s, c, req, done) -> s.enableCatalogJanitor(c, req, done), (resp) -> resp.getPrevValue()))
3450      .call();
3451  }
3452
3453  @Override
3454  public CompletableFuture<Boolean> isCatalogJanitorEnabled() {
3455    return this.<Boolean> newMasterCaller()
3456      .action((controller, stub) -> this.<IsCatalogJanitorEnabledRequest,
3457        IsCatalogJanitorEnabledResponse, Boolean> call(controller, stub,
3458          RequestConverter.buildIsCatalogJanitorEnabledRequest(),
3459          (s, c, req, done) -> s.isCatalogJanitorEnabled(c, req, done), (resp) -> resp.getValue()))
3460      .call();
3461  }
3462
3463  @Override
3464  public CompletableFuture<Integer> runCatalogJanitor() {
3465    return this.<Integer> newMasterCaller()
3466      .action((controller, stub) -> this.<RunCatalogScanRequest, RunCatalogScanResponse,
3467        Integer> call(controller, stub, RequestConverter.buildCatalogScanRequest(),
3468          (s, c, req, done) -> s.runCatalogScan(c, req, done), (resp) -> resp.getScanResult()))
3469      .call();
3470  }
3471
3472  @Override
3473  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3474    ServiceCaller<S, R> callable) {
3475    MasterCoprocessorRpcChannelImpl channel =
3476      new MasterCoprocessorRpcChannelImpl(this.<Message> newMasterCaller());
3477    S stub = stubMaker.apply(channel);
3478    CompletableFuture<R> future = new CompletableFuture<>();
3479    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3480    callable.call(stub, controller, resp -> {
3481      if (controller.failed()) {
3482        future.completeExceptionally(controller.getFailed());
3483      } else {
3484        future.complete(resp);
3485      }
3486    });
3487    return future;
3488  }
3489
3490  @Override
3491  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3492    ServiceCaller<S, R> callable, ServerName serverName) {
3493    RegionServerCoprocessorRpcChannelImpl channel = new RegionServerCoprocessorRpcChannelImpl(
3494      this.<Message> newServerCaller().serverName(serverName));
3495    S stub = stubMaker.apply(channel);
3496    CompletableFuture<R> future = new CompletableFuture<>();
3497    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3498    callable.call(stub, controller, resp -> {
3499      if (controller.failed()) {
3500        future.completeExceptionally(controller.getFailed());
3501      } else {
3502        future.complete(resp);
3503      }
3504    });
3505    return future;
3506  }
3507
3508  @Override
3509  public CompletableFuture<List<ServerName>> clearDeadServers(List<ServerName> servers) {
3510    return this.<List<ServerName>> newMasterCaller()
3511      .action((controller, stub) -> this.<ClearDeadServersRequest, ClearDeadServersResponse,
3512        List<ServerName>> call(controller, stub,
3513          RequestConverter.buildClearDeadServersRequest(servers),
3514          (s, c, req, done) -> s.clearDeadServers(c, req, done),
3515          (resp) -> ProtobufUtil.toServerNameList(resp.getServerNameList())))
3516      .call();
3517  }
3518
3519  <T> ServerRequestCallerBuilder<T> newServerCaller() {
3520    return this.connection.callerFactory.<T> serverRequest()
3521      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
3522      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
3523      .pause(pauseNs, TimeUnit.NANOSECONDS)
3524      .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
3525      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
3526  }
3527
3528  @Override
3529  public CompletableFuture<Void> enableTableReplication(TableName tableName) {
3530    if (tableName == null) {
3531      return failedFuture(new IllegalArgumentException("Table name is null"));
3532    }
3533    CompletableFuture<Void> future = new CompletableFuture<>();
3534    addListener(tableExists(tableName), (exist, err) -> {
3535      if (err != null) {
3536        future.completeExceptionally(err);
3537        return;
3538      }
3539      if (!exist) {
3540        future.completeExceptionally(new TableNotFoundException(
3541          "Table '" + tableName.getNameAsString() + "' does not exists."));
3542        return;
3543      }
3544      addListener(getTableSplits(tableName), (splits, err1) -> {
3545        if (err1 != null) {
3546          future.completeExceptionally(err1);
3547        } else {
3548          addListener(checkAndSyncTableToPeerClusters(tableName, splits), (result, err2) -> {
3549            if (err2 != null) {
3550              future.completeExceptionally(err2);
3551            } else {
3552              addListener(setTableReplication(tableName, true), (result3, err3) -> {
3553                if (err3 != null) {
3554                  future.completeExceptionally(err3);
3555                } else {
3556                  future.complete(result3);
3557                }
3558              });
3559            }
3560          });
3561        }
3562      });
3563    });
3564    return future;
3565  }
3566
3567  @Override
3568  public CompletableFuture<Void> disableTableReplication(TableName tableName) {
3569    if (tableName == null) {
3570      return failedFuture(new IllegalArgumentException("Table name is null"));
3571    }
3572    CompletableFuture<Void> future = new CompletableFuture<>();
3573    addListener(tableExists(tableName), (exist, err) -> {
3574      if (err != null) {
3575        future.completeExceptionally(err);
3576        return;
3577      }
3578      if (!exist) {
3579        future.completeExceptionally(new TableNotFoundException(
3580          "Table '" + tableName.getNameAsString() + "' does not exists."));
3581        return;
3582      }
3583      addListener(setTableReplication(tableName, false), (result, err2) -> {
3584        if (err2 != null) {
3585          future.completeExceptionally(err2);
3586        } else {
3587          future.complete(result);
3588        }
3589      });
3590    });
3591    return future;
3592  }
3593
3594  private CompletableFuture<byte[][]> getTableSplits(TableName tableName) {
3595    CompletableFuture<byte[][]> future = new CompletableFuture<>();
3596    addListener(
3597      getRegions(tableName).thenApply(regions -> regions.stream()
3598        .filter(RegionReplicaUtil::isDefaultReplica).collect(Collectors.toList())),
3599      (regions, err2) -> {
3600        if (err2 != null) {
3601          future.completeExceptionally(err2);
3602          return;
3603        }
3604        if (regions.size() == 1) {
3605          future.complete(null);
3606        } else {
3607          byte[][] splits = new byte[regions.size() - 1][];
3608          for (int i = 1; i < regions.size(); i++) {
3609            splits[i - 1] = regions.get(i).getStartKey();
3610          }
3611          future.complete(splits);
3612        }
3613      });
3614    return future;
3615  }
3616
3617  /**
3618   * Connect to peer and check the table descriptor on peer:
3619   * <ol>
3620   * <li>Create the same table on peer when not exist.</li>
3621   * <li>Throw an exception if the table already has replication enabled on any of the column
3622   * families.</li>
3623   * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li>
3624   * </ol>
3625   * @param tableName name of the table to sync to the peer
3626   * @param splits    table split keys
3627   */
3628  private CompletableFuture<Void> checkAndSyncTableToPeerClusters(TableName tableName,
3629    byte[][] splits) {
3630    CompletableFuture<Void> future = new CompletableFuture<>();
3631    addListener(listReplicationPeers(), (peers, err) -> {
3632      if (err != null) {
3633        future.completeExceptionally(err);
3634        return;
3635      }
3636      if (peers == null || peers.size() <= 0) {
3637        future.completeExceptionally(
3638          new IllegalArgumentException("Found no peer cluster for replication."));
3639        return;
3640      }
3641      List<CompletableFuture<Void>> futures = new ArrayList<>();
3642      peers.stream().filter(peer -> peer.getPeerConfig().needToReplicate(tableName))
3643        .forEach(peer -> {
3644          futures.add(trySyncTableToPeerCluster(tableName, splits, peer));
3645        });
3646      addListener(
3647        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3648        (result, err2) -> {
3649          if (err2 != null) {
3650            future.completeExceptionally(err2);
3651          } else {
3652            future.complete(result);
3653          }
3654        });
3655    });
3656    return future;
3657  }
3658
3659  private CompletableFuture<Void> trySyncTableToPeerCluster(TableName tableName, byte[][] splits,
3660    ReplicationPeerDescription peer) {
3661    Configuration peerConf = null;
3662    try {
3663      peerConf =
3664        ReplicationPeerConfigUtil.getPeerClusterConfiguration(connection.getConfiguration(), peer);
3665    } catch (IOException e) {
3666      return failedFuture(e);
3667    }
3668    CompletableFuture<Void> future = new CompletableFuture<>();
3669    addListener(ConnectionFactory.createAsyncConnection(peerConf), (conn, err) -> {
3670      if (err != null) {
3671        future.completeExceptionally(err);
3672        return;
3673      }
3674      addListener(getDescriptor(tableName), (tableDesc, err1) -> {
3675        if (err1 != null) {
3676          future.completeExceptionally(err1);
3677          return;
3678        }
3679        AsyncAdmin peerAdmin = conn.getAdmin();
3680        addListener(peerAdmin.tableExists(tableName), (exist, err2) -> {
3681          if (err2 != null) {
3682            future.completeExceptionally(err2);
3683            return;
3684          }
3685          if (!exist) {
3686            CompletableFuture<Void> createTableFuture = null;
3687            if (splits == null) {
3688              createTableFuture = peerAdmin.createTable(tableDesc);
3689            } else {
3690              createTableFuture = peerAdmin.createTable(tableDesc, splits);
3691            }
3692            addListener(createTableFuture, (result, err3) -> {
3693              if (err3 != null) {
3694                future.completeExceptionally(err3);
3695              } else {
3696                future.complete(result);
3697              }
3698            });
3699          } else {
3700            addListener(compareTableWithPeerCluster(tableName, tableDesc, peer, peerAdmin),
3701              (result, err4) -> {
3702                if (err4 != null) {
3703                  future.completeExceptionally(err4);
3704                } else {
3705                  future.complete(result);
3706                }
3707              });
3708          }
3709        });
3710      });
3711    });
3712    return future;
3713  }
3714
3715  private CompletableFuture<Void> compareTableWithPeerCluster(TableName tableName,
3716    TableDescriptor tableDesc, ReplicationPeerDescription peer, AsyncAdmin peerAdmin) {
3717    CompletableFuture<Void> future = new CompletableFuture<>();
3718    addListener(peerAdmin.getDescriptor(tableName), (peerTableDesc, err) -> {
3719      if (err != null) {
3720        future.completeExceptionally(err);
3721        return;
3722      }
3723      if (peerTableDesc == null) {
3724        future.completeExceptionally(
3725          new IllegalArgumentException("Failed to get table descriptor for table "
3726            + tableName.getNameAsString() + " from peer cluster " + peer.getPeerId()));
3727        return;
3728      }
3729      if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc, tableDesc) != 0) {
3730        future.completeExceptionally(new IllegalArgumentException(
3731          "Table " + tableName.getNameAsString() + " exists in peer cluster " + peer.getPeerId()
3732            + ", but the table descriptors are not same when compared with source cluster."
3733            + " Thus can not enable the table's replication switch."));
3734        return;
3735      }
3736      future.complete(null);
3737    });
3738    return future;
3739  }
3740
3741  /**
3742   * Set the table's replication switch if the table's replication switch is already not set.
3743   * @param tableName name of the table
3744   * @param enableRep is replication switch enable or disable
3745   */
3746  private CompletableFuture<Void> setTableReplication(TableName tableName, boolean enableRep) {
3747    CompletableFuture<Void> future = new CompletableFuture<>();
3748    addListener(getDescriptor(tableName), (tableDesc, err) -> {
3749      if (err != null) {
3750        future.completeExceptionally(err);
3751        return;
3752      }
3753      if (!tableDesc.matchReplicationScope(enableRep)) {
3754        int scope =
3755          enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL;
3756        TableDescriptor newTableDesc =
3757          TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build();
3758        addListener(modifyTable(newTableDesc), (result, err2) -> {
3759          if (err2 != null) {
3760            future.completeExceptionally(err2);
3761          } else {
3762            future.complete(result);
3763          }
3764        });
3765      } else {
3766        future.complete(null);
3767      }
3768    });
3769    return future;
3770  }
3771
3772  @Override
3773  public CompletableFuture<Boolean> isReplicationPeerEnabled(String peerId) {
3774    GetReplicationPeerStateRequest.Builder request = GetReplicationPeerStateRequest.newBuilder();
3775    request.setPeerId(peerId);
3776    return this.<Boolean> newMasterCaller()
3777      .action((controller, stub) -> this.<GetReplicationPeerStateRequest,
3778        GetReplicationPeerStateResponse, Boolean> call(controller, stub, request.build(),
3779          (s, c, req, done) -> s.isReplicationPeerEnabled(c, req, done),
3780          resp -> resp.getIsEnabled()))
3781      .call();
3782  }
3783
3784  private void waitUntilAllReplicationPeerModificationProceduresDone(
3785    CompletableFuture<Boolean> future, boolean prevOn, int retries) {
3786    CompletableFuture<List<ProcedureProtos.Procedure>> callFuture =
3787      this.<List<ProcedureProtos.Procedure>> newMasterCaller()
3788        .action((controller, stub) -> this.<GetReplicationPeerModificationProceduresRequest,
3789          GetReplicationPeerModificationProceduresResponse, List<ProcedureProtos.Procedure>> call(
3790            controller, stub, GetReplicationPeerModificationProceduresRequest.getDefaultInstance(),
3791            (s, c, req, done) -> s.getReplicationPeerModificationProcedures(c, req, done),
3792            resp -> resp.getProcedureList()))
3793        .call();
3794    addListener(callFuture, (r, e) -> {
3795      if (e != null) {
3796        future.completeExceptionally(e);
3797      } else if (r.isEmpty()) {
3798        // we are done
3799        future.complete(prevOn);
3800      } else {
3801        // retry later to see if the procedures are done
3802        retryTimer.newTimeout(
3803          t -> waitUntilAllReplicationPeerModificationProceduresDone(future, prevOn, retries + 1),
3804          ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
3805      }
3806    });
3807  }
3808
3809  @Override
3810  public CompletableFuture<Boolean> replicationPeerModificationSwitch(boolean on,
3811    boolean drainProcedures) {
3812    ReplicationPeerModificationSwitchRequest request =
3813      ReplicationPeerModificationSwitchRequest.newBuilder().setOn(on).build();
3814    CompletableFuture<Boolean> callFuture = this.<Boolean> newMasterCaller()
3815      .action((controller, stub) -> this.<ReplicationPeerModificationSwitchRequest,
3816        ReplicationPeerModificationSwitchResponse, Boolean> call(controller, stub, request,
3817          (s, c, req, done) -> s.replicationPeerModificationSwitch(c, req, done),
3818          resp -> resp.getPreviousValue()))
3819      .call();
3820    // if we do not need to wait all previous peer modification procedure done, or we are enabling
3821    // peer modification, just return here.
3822    if (!drainProcedures || on) {
3823      return callFuture;
3824    }
3825    // otherwise we need to wait until all previous peer modification procedure done
3826    CompletableFuture<Boolean> future = new CompletableFuture<>();
3827    addListener(callFuture, (prevOn, err) -> {
3828      if (err != null) {
3829        future.completeExceptionally(err);
3830        return;
3831      }
3832      // even if the previous state is disabled, we still need to wait here, as there could be
3833      // another client thread which called this method just before us and have already changed the
3834      // state to off, but there are still peer modification procedures not finished, so we should
3835      // also wait here.
3836      waitUntilAllReplicationPeerModificationProceduresDone(future, prevOn, 0);
3837    });
3838    return future;
3839  }
3840
3841  @Override
3842  public CompletableFuture<Boolean> isReplicationPeerModificationEnabled() {
3843    return this.<Boolean> newMasterCaller()
3844      .action((controller, stub) -> this.<IsReplicationPeerModificationEnabledRequest,
3845        IsReplicationPeerModificationEnabledResponse, Boolean> call(controller, stub,
3846          IsReplicationPeerModificationEnabledRequest.getDefaultInstance(),
3847          (s, c, req, done) -> s.isReplicationPeerModificationEnabled(c, req, done),
3848          (resp) -> resp.getEnabled()))
3849      .call();
3850  }
3851
3852  @Override
3853  public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) {
3854    CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>();
3855    addListener(getTableHRegionLocations(tableName), (locations, err) -> {
3856      if (err != null) {
3857        future.completeExceptionally(err);
3858        return;
3859      }
3860      Map<ServerName, List<RegionInfo>> regionInfoByServerName =
3861        locations.stream().filter(l -> l.getRegion() != null)
3862          .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
3863          .collect(Collectors.groupingBy(l -> l.getServerName(),
3864            Collectors.mapping(l -> l.getRegion(), Collectors.toList())));
3865      List<CompletableFuture<CacheEvictionStats>> futures = new ArrayList<>();
3866      CacheEvictionStatsAggregator aggregator = new CacheEvictionStatsAggregator();
3867      for (Map.Entry<ServerName, List<RegionInfo>> entry : regionInfoByServerName.entrySet()) {
3868        futures
3869          .add(clearBlockCache(entry.getKey(), entry.getValue()).whenComplete((stats, err2) -> {
3870            if (err2 != null) {
3871              future.completeExceptionally(unwrapCompletionException(err2));
3872            } else {
3873              aggregator.append(stats);
3874            }
3875          }));
3876      }
3877      addListener(CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])),
3878        (ret, err3) -> {
3879          if (err3 != null) {
3880            future.completeExceptionally(unwrapCompletionException(err3));
3881          } else {
3882            future.complete(aggregator.sum());
3883          }
3884        });
3885    });
3886    return future;
3887  }
3888
3889  @Override
3890  public CompletableFuture<Void> cloneTableSchema(TableName tableName, TableName newTableName,
3891    boolean preserveSplits) {
3892    CompletableFuture<Void> future = new CompletableFuture<>();
3893    addListener(tableExists(tableName), (exist, err) -> {
3894      if (err != null) {
3895        future.completeExceptionally(err);
3896        return;
3897      }
3898      if (!exist) {
3899        future.completeExceptionally(new TableNotFoundException(tableName));
3900        return;
3901      }
3902      addListener(tableExists(newTableName), (exist1, err1) -> {
3903        if (err1 != null) {
3904          future.completeExceptionally(err1);
3905          return;
3906        }
3907        if (exist1) {
3908          future.completeExceptionally(new TableExistsException(newTableName));
3909          return;
3910        }
3911        addListener(getDescriptor(tableName), (tableDesc, err2) -> {
3912          if (err2 != null) {
3913            future.completeExceptionally(err2);
3914            return;
3915          }
3916          TableDescriptor newTableDesc = TableDescriptorBuilder.copy(newTableName, tableDesc);
3917          if (preserveSplits) {
3918            addListener(getTableSplits(tableName), (splits, err3) -> {
3919              if (err3 != null) {
3920                future.completeExceptionally(err3);
3921              } else {
3922                addListener(
3923                  splits != null ? createTable(newTableDesc, splits) : createTable(newTableDesc),
3924                  (result, err4) -> {
3925                    if (err4 != null) {
3926                      future.completeExceptionally(err4);
3927                    } else {
3928                      future.complete(result);
3929                    }
3930                  });
3931              }
3932            });
3933          } else {
3934            addListener(createTable(newTableDesc), (result, err5) -> {
3935              if (err5 != null) {
3936                future.completeExceptionally(err5);
3937              } else {
3938                future.complete(result);
3939              }
3940            });
3941          }
3942        });
3943      });
3944    });
3945    return future;
3946  }
3947
3948  private CompletableFuture<CacheEvictionStats> clearBlockCache(ServerName serverName,
3949    List<RegionInfo> hris) {
3950    return this.<CacheEvictionStats> newAdminCaller()
3951      .action((controller, stub) -> this.<ClearRegionBlockCacheRequest,
3952        ClearRegionBlockCacheResponse, CacheEvictionStats> adminCall(controller, stub,
3953          RequestConverter.buildClearRegionBlockCacheRequest(hris),
3954          (s, c, req, done) -> s.clearRegionBlockCache(controller, req, done),
3955          resp -> ProtobufUtil.toCacheEvictionStats(resp.getStats())))
3956      .serverName(serverName).call();
3957  }
3958
3959  @Override
3960  public CompletableFuture<Boolean> switchRpcThrottle(boolean enable) {
3961    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
3962      .action((controller, stub) -> this.<SwitchRpcThrottleRequest, SwitchRpcThrottleResponse,
3963        Boolean> call(controller, stub,
3964          SwitchRpcThrottleRequest.newBuilder().setRpcThrottleEnabled(enable).build(),
3965          (s, c, req, done) -> s.switchRpcThrottle(c, req, done),
3966          resp -> resp.getPreviousRpcThrottleEnabled()))
3967      .call();
3968    return future;
3969  }
3970
3971  @Override
3972  public CompletableFuture<Boolean> isRpcThrottleEnabled() {
3973    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
3974      .action((controller, stub) -> this.<IsRpcThrottleEnabledRequest, IsRpcThrottleEnabledResponse,
3975        Boolean> call(controller, stub, IsRpcThrottleEnabledRequest.newBuilder().build(),
3976          (s, c, req, done) -> s.isRpcThrottleEnabled(c, req, done),
3977          resp -> resp.getRpcThrottleEnabled()))
3978      .call();
3979    return future;
3980  }
3981
3982  @Override
3983  public CompletableFuture<Boolean> exceedThrottleQuotaSwitch(boolean enable) {
3984    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
3985      .action((controller, stub) -> this.<SwitchExceedThrottleQuotaRequest,
3986        SwitchExceedThrottleQuotaResponse, Boolean> call(controller, stub,
3987          SwitchExceedThrottleQuotaRequest.newBuilder().setExceedThrottleQuotaEnabled(enable)
3988            .build(),
3989          (s, c, req, done) -> s.switchExceedThrottleQuota(c, req, done),
3990          resp -> resp.getPreviousExceedThrottleQuotaEnabled()))
3991      .call();
3992    return future;
3993  }
3994
3995  @Override
3996  public CompletableFuture<Map<TableName, Long>> getSpaceQuotaTableSizes() {
3997    return this.<Map<TableName, Long>> newMasterCaller()
3998      .action((controller, stub) -> this.<GetSpaceQuotaRegionSizesRequest,
3999        GetSpaceQuotaRegionSizesResponse, Map<TableName, Long>> call(controller, stub,
4000          RequestConverter.buildGetSpaceQuotaRegionSizesRequest(),
4001          (s, c, req, done) -> s.getSpaceQuotaRegionSizes(c, req, done),
4002          resp -> resp.getSizesList().stream().collect(Collectors
4003            .toMap(sizes -> ProtobufUtil.toTableName(sizes.getTableName()), RegionSizes::getSize))))
4004      .call();
4005  }
4006
4007  @Override
4008  public CompletableFuture<Map<TableName, SpaceQuotaSnapshot>>
4009    getRegionServerSpaceQuotaSnapshots(ServerName serverName) {
4010    return this.<Map<TableName, SpaceQuotaSnapshot>> newAdminCaller()
4011      .action((controller, stub) -> this.<GetSpaceQuotaSnapshotsRequest,
4012        GetSpaceQuotaSnapshotsResponse, Map<TableName, SpaceQuotaSnapshot>> adminCall(controller,
4013          stub, RequestConverter.buildGetSpaceQuotaSnapshotsRequest(),
4014          (s, c, req, done) -> s.getSpaceQuotaSnapshots(controller, req, done),
4015          resp -> resp.getSnapshotsList().stream()
4016            .collect(Collectors.toMap(snapshot -> ProtobufUtil.toTableName(snapshot.getTableName()),
4017              snapshot -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(snapshot.getSnapshot())))))
4018      .serverName(serverName).call();
4019  }
4020
4021  private CompletableFuture<SpaceQuotaSnapshot>
4022    getCurrentSpaceQuotaSnapshot(Converter<SpaceQuotaSnapshot, GetQuotaStatesResponse> converter) {
4023    return this.<SpaceQuotaSnapshot> newMasterCaller()
4024      .action((controller, stub) -> this.<GetQuotaStatesRequest, GetQuotaStatesResponse,
4025        SpaceQuotaSnapshot> call(controller, stub, RequestConverter.buildGetQuotaStatesRequest(),
4026          (s, c, req, done) -> s.getQuotaStates(c, req, done), converter))
4027      .call();
4028  }
4029
4030  @Override
4031  public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(String namespace) {
4032    return getCurrentSpaceQuotaSnapshot(resp -> resp.getNsSnapshotsList().stream()
4033      .filter(s -> s.getNamespace().equals(namespace)).findFirst()
4034      .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
4035  }
4036
4037  @Override
4038  public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(TableName tableName) {
4039    HBaseProtos.TableName protoTableName = ProtobufUtil.toProtoTableName(tableName);
4040    return getCurrentSpaceQuotaSnapshot(resp -> resp.getTableSnapshotsList().stream()
4041      .filter(s -> s.getTableName().equals(protoTableName)).findFirst()
4042      .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
4043  }
4044
4045  @Override
4046  public CompletableFuture<Void> grant(UserPermission userPermission,
4047    boolean mergeExistingPermissions) {
4048    return this.<Void> newMasterCaller()
4049      .action((controller, stub) -> this.<GrantRequest, GrantResponse, Void> call(controller, stub,
4050        ShadedAccessControlUtil.buildGrantRequest(userPermission, mergeExistingPermissions),
4051        (s, c, req, done) -> s.grant(c, req, done), resp -> null))
4052      .call();
4053  }
4054
4055  @Override
4056  public CompletableFuture<Void> revoke(UserPermission userPermission) {
4057    return this.<Void> newMasterCaller()
4058      .action((controller, stub) -> this.<RevokeRequest, RevokeResponse, Void> call(controller,
4059        stub, ShadedAccessControlUtil.buildRevokeRequest(userPermission),
4060        (s, c, req, done) -> s.revoke(c, req, done), resp -> null))
4061      .call();
4062  }
4063
4064  @Override
4065  public CompletableFuture<List<UserPermission>>
4066    getUserPermissions(GetUserPermissionsRequest getUserPermissionsRequest) {
4067    return this.<List<UserPermission>> newMasterCaller()
4068      .action((controller, stub) -> this.<AccessControlProtos.GetUserPermissionsRequest,
4069        GetUserPermissionsResponse, List<UserPermission>> call(controller, stub,
4070          ShadedAccessControlUtil.buildGetUserPermissionsRequest(getUserPermissionsRequest),
4071          (s, c, req, done) -> s.getUserPermissions(c, req, done),
4072          resp -> resp.getUserPermissionList().stream()
4073            .map(uPerm -> ShadedAccessControlUtil.toUserPermission(uPerm))
4074            .collect(Collectors.toList())))
4075      .call();
4076  }
4077
4078  @Override
4079  public CompletableFuture<List<Boolean>> hasUserPermissions(String userName,
4080    List<Permission> permissions) {
4081    return this.<List<Boolean>> newMasterCaller()
4082      .action((controller, stub) -> this.<HasUserPermissionsRequest, HasUserPermissionsResponse,
4083        List<Boolean>> call(controller, stub,
4084          ShadedAccessControlUtil.buildHasUserPermissionsRequest(userName, permissions),
4085          (s, c, req, done) -> s.hasUserPermissions(c, req, done),
4086          resp -> resp.getHasUserPermissionList()))
4087      .call();
4088  }
4089
4090  @Override
4091  public CompletableFuture<Boolean> snapshotCleanupSwitch(final boolean on, final boolean sync) {
4092    return this.<Boolean> newMasterCaller()
4093      .action((controller, stub) -> this.call(controller, stub,
4094        RequestConverter.buildSetSnapshotCleanupRequest(on, sync),
4095        MasterService.Interface::switchSnapshotCleanup,
4096        SetSnapshotCleanupResponse::getPrevSnapshotCleanup))
4097      .call();
4098  }
4099
4100  @Override
4101  public CompletableFuture<Boolean> isSnapshotCleanupEnabled() {
4102    return this.<Boolean> newMasterCaller()
4103      .action((controller, stub) -> this.call(controller, stub,
4104        RequestConverter.buildIsSnapshotCleanupEnabledRequest(),
4105        MasterService.Interface::isSnapshotCleanupEnabled,
4106        IsSnapshotCleanupEnabledResponse::getEnabled))
4107      .call();
4108  }
4109
4110  @Override
4111  public CompletableFuture<Void> moveServersToRSGroup(Set<Address> servers, String groupName) {
4112    return this.<Void> newMasterCaller()
4113      .action((controller, stub) -> this.<MoveServersRequest, MoveServersResponse, Void> call(
4114        controller, stub, RequestConverter.buildMoveServersRequest(servers, groupName),
4115        (s, c, req, done) -> s.moveServers(c, req, done), resp -> null))
4116      .call();
4117  }
4118
4119  @Override
4120  public CompletableFuture<Void> addRSGroup(String groupName) {
4121    return this.<Void> newMasterCaller()
4122      .action((controller, stub) -> this.<AddRSGroupRequest, AddRSGroupResponse, Void> call(
4123        controller, stub, AddRSGroupRequest.newBuilder().setRSGroupName(groupName).build(),
4124        (s, c, req, done) -> s.addRSGroup(c, req, done), resp -> null))
4125      .call();
4126  }
4127
4128  @Override
4129  public CompletableFuture<Void> removeRSGroup(String groupName) {
4130    return this.<Void> newMasterCaller()
4131      .action((controller, stub) -> this.<RemoveRSGroupRequest, RemoveRSGroupResponse, Void> call(
4132        controller, stub, RemoveRSGroupRequest.newBuilder().setRSGroupName(groupName).build(),
4133        (s, c, req, done) -> s.removeRSGroup(c, req, done), resp -> null))
4134      .call();
4135  }
4136
4137  @Override
4138  public CompletableFuture<BalanceResponse> balanceRSGroup(String groupName,
4139    BalanceRequest request) {
4140    return this.<BalanceResponse> newMasterCaller()
4141      .action((controller, stub) -> this.<BalanceRSGroupRequest, BalanceRSGroupResponse,
4142        BalanceResponse> call(controller, stub,
4143          ProtobufUtil.createBalanceRSGroupRequest(groupName, request),
4144          MasterService.Interface::balanceRSGroup, ProtobufUtil::toBalanceResponse))
4145      .call();
4146  }
4147
4148  @Override
4149  public CompletableFuture<List<RSGroupInfo>> listRSGroups() {
4150    return this.<List<RSGroupInfo>> newMasterCaller()
4151      .action((controller, stub) -> this.<ListRSGroupInfosRequest, ListRSGroupInfosResponse,
4152        List<RSGroupInfo>> call(controller, stub, ListRSGroupInfosRequest.getDefaultInstance(),
4153          (s, c, req, done) -> s.listRSGroupInfos(c, req, done), resp -> resp.getRSGroupInfoList()
4154            .stream().map(r -> ProtobufUtil.toGroupInfo(r)).collect(Collectors.toList())))
4155      .call();
4156  }
4157
4158  private CompletableFuture<List<LogEntry>> getSlowLogResponses(
4159    final Map<String, Object> filterParams, final Set<ServerName> serverNames, final int limit,
4160    final String logType) {
4161    if (CollectionUtils.isEmpty(serverNames)) {
4162      return CompletableFuture.completedFuture(Collections.emptyList());
4163    }
4164    return CompletableFuture.supplyAsync(() -> serverNames
4165      .stream().map((ServerName serverName) -> getSlowLogResponseFromServer(serverName,
4166        filterParams, limit, logType))
4167      .map(CompletableFuture::join).flatMap(List::stream).collect(Collectors.toList()));
4168  }
4169
4170  private CompletableFuture<List<LogEntry>> getSlowLogResponseFromServer(ServerName serverName,
4171    Map<String, Object> filterParams, int limit, String logType) {
4172    return this.<List<LogEntry>> newAdminCaller()
4173      .action((controller, stub) -> this.adminCall(controller, stub,
4174        RequestConverter.buildSlowLogResponseRequest(filterParams, limit, logType),
4175        AdminService.Interface::getLogEntries, ProtobufUtil::toSlowLogPayloads))
4176      .serverName(serverName).call();
4177  }
4178
4179  @Override
4180  public CompletableFuture<List<Boolean>>
4181    clearSlowLogResponses(@Nullable Set<ServerName> serverNames) {
4182    if (CollectionUtils.isEmpty(serverNames)) {
4183      return CompletableFuture.completedFuture(Collections.emptyList());
4184    }
4185    List<CompletableFuture<Boolean>> clearSlowLogResponseList =
4186      serverNames.stream().map(this::clearSlowLogsResponses).collect(Collectors.toList());
4187    return convertToFutureOfList(clearSlowLogResponseList);
4188  }
4189
4190  private CompletableFuture<Boolean> clearSlowLogsResponses(final ServerName serverName) {
4191    return this.<Boolean> newAdminCaller()
4192      .action((controller, stub) -> this.adminCall(controller, stub,
4193        RequestConverter.buildClearSlowLogResponseRequest(),
4194        AdminService.Interface::clearSlowLogsResponses, ProtobufUtil::toClearSlowLogPayload))
4195      .serverName(serverName).call();
4196  }
4197
4198  private static <T> CompletableFuture<List<T>>
4199    convertToFutureOfList(List<CompletableFuture<T>> futures) {
4200    CompletableFuture<Void> allDoneFuture =
4201      CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
4202    return allDoneFuture
4203      .thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));
4204  }
4205
4206  @Override
4207  public CompletableFuture<List<TableName>> listTablesInRSGroup(String groupName) {
4208    return this.<List<TableName>> newMasterCaller()
4209      .action((controller, stub) -> this.<ListTablesInRSGroupRequest, ListTablesInRSGroupResponse,
4210        List<TableName>> call(controller, stub,
4211          ListTablesInRSGroupRequest.newBuilder().setGroupName(groupName).build(),
4212          (s, c, req, done) -> s.listTablesInRSGroup(c, req, done), resp -> resp.getTableNameList()
4213            .stream().map(ProtobufUtil::toTableName).collect(Collectors.toList())))
4214      .call();
4215  }
4216
4217  @Override
4218  public CompletableFuture<Pair<List<String>, List<TableName>>>
4219    getConfiguredNamespacesAndTablesInRSGroup(String groupName) {
4220    return this.<Pair<List<String>, List<TableName>>> newMasterCaller()
4221      .action((controller, stub) -> this.<GetConfiguredNamespacesAndTablesInRSGroupRequest,
4222        GetConfiguredNamespacesAndTablesInRSGroupResponse,
4223        Pair<List<String>, List<TableName>>> call(controller, stub,
4224          GetConfiguredNamespacesAndTablesInRSGroupRequest.newBuilder().setGroupName(groupName)
4225            .build(),
4226          (s, c, req, done) -> s.getConfiguredNamespacesAndTablesInRSGroup(c, req, done),
4227          resp -> Pair.newPair(resp.getNamespaceList(), resp.getTableNameList().stream()
4228            .map(ProtobufUtil::toTableName).collect(Collectors.toList()))))
4229      .call();
4230  }
4231
4232  @Override
4233  public CompletableFuture<RSGroupInfo> getRSGroup(Address hostPort) {
4234    return this.<RSGroupInfo> newMasterCaller()
4235      .action((controller, stub) -> this.<GetRSGroupInfoOfServerRequest,
4236        GetRSGroupInfoOfServerResponse, RSGroupInfo> call(controller, stub,
4237          GetRSGroupInfoOfServerRequest.newBuilder()
4238            .setServer(HBaseProtos.ServerName.newBuilder().setHostName(hostPort.getHostname())
4239              .setPort(hostPort.getPort()).build())
4240            .build(),
4241          (s, c, req, done) -> s.getRSGroupInfoOfServer(c, req, done),
4242          resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null))
4243      .call();
4244  }
4245
4246  @Override
4247  public CompletableFuture<Void> removeServersFromRSGroup(Set<Address> servers) {
4248    return this.<Void> newMasterCaller()
4249      .action((controller, stub) -> this.<RemoveServersRequest, RemoveServersResponse, Void> call(
4250        controller, stub, RequestConverter.buildRemoveServersRequest(servers),
4251        (s, c, req, done) -> s.removeServers(c, req, done), resp -> null))
4252      .call();
4253  }
4254
4255  @Override
4256  public CompletableFuture<Void> setRSGroup(Set<TableName> tables, String groupName) {
4257    CompletableFuture<Void> future = new CompletableFuture<>();
4258    for (TableName tableName : tables) {
4259      addListener(tableExists(tableName), (exist, err) -> {
4260        if (err != null) {
4261          future.completeExceptionally(err);
4262          return;
4263        }
4264        if (!exist) {
4265          future.completeExceptionally(new TableNotFoundException(tableName));
4266          return;
4267        }
4268      });
4269    }
4270    addListener(listTableDescriptors(new ArrayList<>(tables)), (tableDescriptions, err) -> {
4271      if (err != null) {
4272        future.completeExceptionally(err);
4273        return;
4274      }
4275      if (tableDescriptions == null || tableDescriptions.isEmpty()) {
4276        future.complete(null);
4277        return;
4278      }
4279      List<TableDescriptor> newTableDescriptors = new ArrayList<>();
4280      for (TableDescriptor td : tableDescriptions) {
4281        newTableDescriptors
4282          .add(TableDescriptorBuilder.newBuilder(td).setRegionServerGroup(groupName).build());
4283      }
4284      addListener(
4285        CompletableFuture.allOf(
4286          newTableDescriptors.stream().map(this::modifyTable).toArray(CompletableFuture[]::new)),
4287        (v, e) -> {
4288          if (e != null) {
4289            future.completeExceptionally(e);
4290          } else {
4291            future.complete(v);
4292          }
4293        });
4294    });
4295    return future;
4296  }
4297
4298  @Override
4299  public CompletableFuture<RSGroupInfo> getRSGroup(TableName table) {
4300    return this.<RSGroupInfo> newMasterCaller()
4301      .action((controller, stub) -> this.<GetRSGroupInfoOfTableRequest,
4302        GetRSGroupInfoOfTableResponse, RSGroupInfo> call(controller, stub,
4303          GetRSGroupInfoOfTableRequest.newBuilder()
4304            .setTableName(ProtobufUtil.toProtoTableName(table)).build(),
4305          (s, c, req, done) -> s.getRSGroupInfoOfTable(c, req, done),
4306          resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null))
4307      .call();
4308  }
4309
4310  @Override
4311  public CompletableFuture<RSGroupInfo> getRSGroup(String groupName) {
4312    return this.<RSGroupInfo> newMasterCaller()
4313      .action((controller, stub) -> this.<GetRSGroupInfoRequest, GetRSGroupInfoResponse,
4314        RSGroupInfo> call(controller, stub,
4315          GetRSGroupInfoRequest.newBuilder().setRSGroupName(groupName).build(),
4316          (s, c, req, done) -> s.getRSGroupInfo(c, req, done),
4317          resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null))
4318      .call();
4319  }
4320
4321  @Override
4322  public CompletableFuture<Void> renameRSGroup(String oldName, String newName) {
4323    return this.<Void> newMasterCaller()
4324      .action((controller, stub) -> this.<RenameRSGroupRequest, RenameRSGroupResponse, Void> call(
4325        controller, stub, RenameRSGroupRequest.newBuilder().setOldRsgroupName(oldName)
4326          .setNewRsgroupName(newName).build(),
4327        (s, c, req, done) -> s.renameRSGroup(c, req, done), resp -> null))
4328      .call();
4329  }
4330
4331  @Override
4332  public CompletableFuture<Void> updateRSGroupConfig(String groupName,
4333    Map<String, String> configuration) {
4334    UpdateRSGroupConfigRequest.Builder request =
4335      UpdateRSGroupConfigRequest.newBuilder().setGroupName(groupName);
4336    if (configuration != null) {
4337      configuration.entrySet().forEach(e -> request.addConfiguration(
4338        NameStringPair.newBuilder().setName(e.getKey()).setValue(e.getValue()).build()));
4339    }
4340    return this.<Void> newMasterCaller()
4341      .action((controller, stub) -> this.<UpdateRSGroupConfigRequest, UpdateRSGroupConfigResponse,
4342        Void> call(controller, stub, request.build(),
4343          (s, c, req, done) -> s.updateRSGroupConfig(c, req, done), resp -> null))
4344      .call();
4345  }
4346
4347  private CompletableFuture<List<LogEntry>> getBalancerDecisions(final int limit) {
4348    return this.<List<LogEntry>> newMasterCaller()
4349      .action((controller, stub) -> this.call(controller, stub,
4350        ProtobufUtil.toBalancerDecisionRequest(limit), MasterService.Interface::getLogEntries,
4351        ProtobufUtil::toBalancerDecisionResponse))
4352      .call();
4353  }
4354
4355  private CompletableFuture<List<LogEntry>> getBalancerRejections(final int limit) {
4356    return this.<List<LogEntry>> newMasterCaller()
4357      .action((controller, stub) -> this.call(controller, stub,
4358        ProtobufUtil.toBalancerRejectionRequest(limit), MasterService.Interface::getLogEntries,
4359        ProtobufUtil::toBalancerRejectionResponse))
4360      .call();
4361  }
4362
4363  @Override
4364  public CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNames,
4365    String logType, ServerType serverType, int limit, Map<String, Object> filterParams) {
4366    if (logType == null || serverType == null) {
4367      throw new IllegalArgumentException("logType and/or serverType cannot be empty");
4368    }
4369    switch (logType) {
4370      case "SLOW_LOG":
4371      case "LARGE_LOG":
4372        if (ServerType.MASTER.equals(serverType)) {
4373          throw new IllegalArgumentException("Slow/Large logs are not maintained by HMaster");
4374        }
4375        return getSlowLogResponses(filterParams, serverNames, limit, logType);
4376      case "BALANCER_DECISION":
4377        if (ServerType.REGION_SERVER.equals(serverType)) {
4378          throw new IllegalArgumentException(
4379            "Balancer Decision logs are not maintained by HRegionServer");
4380        }
4381        return getBalancerDecisions(limit);
4382      case "BALANCER_REJECTION":
4383        if (ServerType.REGION_SERVER.equals(serverType)) {
4384          throw new IllegalArgumentException(
4385            "Balancer Rejection logs are not maintained by HRegionServer");
4386        }
4387        return getBalancerRejections(limit);
4388      default:
4389        return CompletableFuture.completedFuture(Collections.emptyList());
4390    }
4391  }
4392
4393  @Override
4394  public CompletableFuture<Void> flushMasterStore() {
4395    FlushMasterStoreRequest.Builder request = FlushMasterStoreRequest.newBuilder();
4396    return this.<Void> newMasterCaller()
4397      .action((controller, stub) -> this.<FlushMasterStoreRequest, FlushMasterStoreResponse,
4398        Void> call(controller, stub, request.build(),
4399          (s, c, req, done) -> s.flushMasterStore(c, req, done), resp -> null))
4400      .call();
4401  }
4402}