001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.HIGH_QOS;
021import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
022import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
023import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
024
025import edu.umd.cs.findbugs.annotations.Nullable;
026import java.io.IOException;
027import java.net.URI;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.Collections;
031import java.util.EnumSet;
032import java.util.HashMap;
033import java.util.List;
034import java.util.Map;
035import java.util.Optional;
036import java.util.Set;
037import java.util.concurrent.CompletableFuture;
038import java.util.concurrent.ConcurrentHashMap;
039import java.util.concurrent.ConcurrentLinkedQueue;
040import java.util.concurrent.TimeUnit;
041import java.util.concurrent.atomic.AtomicReference;
042import java.util.function.BiConsumer;
043import java.util.function.Consumer;
044import java.util.function.Function;
045import java.util.function.Supplier;
046import java.util.regex.Pattern;
047import java.util.stream.Collectors;
048import java.util.stream.Stream;
049import org.apache.hadoop.conf.Configuration;
050import org.apache.hadoop.hbase.CacheEvictionStats;
051import org.apache.hadoop.hbase.CacheEvictionStatsAggregator;
052import org.apache.hadoop.hbase.CatalogFamilyFormat;
053import org.apache.hadoop.hbase.ClientMetaTableAccessor;
054import org.apache.hadoop.hbase.ClusterMetrics;
055import org.apache.hadoop.hbase.ClusterMetrics.Option;
056import org.apache.hadoop.hbase.ClusterMetricsBuilder;
057import org.apache.hadoop.hbase.DoNotRetryIOException;
058import org.apache.hadoop.hbase.HConstants;
059import org.apache.hadoop.hbase.HRegionLocation;
060import org.apache.hadoop.hbase.NamespaceDescriptor;
061import org.apache.hadoop.hbase.RegionLocations;
062import org.apache.hadoop.hbase.RegionMetrics;
063import org.apache.hadoop.hbase.RegionMetricsBuilder;
064import org.apache.hadoop.hbase.ServerName;
065import org.apache.hadoop.hbase.TableExistsException;
066import org.apache.hadoop.hbase.TableName;
067import org.apache.hadoop.hbase.TableNotDisabledException;
068import org.apache.hadoop.hbase.TableNotEnabledException;
069import org.apache.hadoop.hbase.TableNotFoundException;
070import org.apache.hadoop.hbase.UnknownRegionException;
071import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder;
072import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
073import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder;
074import org.apache.hadoop.hbase.client.Scan.ReadType;
075import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
076import org.apache.hadoop.hbase.client.replication.TableCFs;
077import org.apache.hadoop.hbase.client.security.SecurityCapability;
078import org.apache.hadoop.hbase.exceptions.DeserializationException;
079import org.apache.hadoop.hbase.ipc.HBaseRpcController;
080import org.apache.hadoop.hbase.net.Address;
081import org.apache.hadoop.hbase.quotas.QuotaFilter;
082import org.apache.hadoop.hbase.quotas.QuotaSettings;
083import org.apache.hadoop.hbase.quotas.QuotaTableUtil;
084import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
085import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
086import org.apache.hadoop.hbase.replication.ReplicationException;
087import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
088import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
089import org.apache.hadoop.hbase.replication.SyncReplicationState;
090import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
091import org.apache.hadoop.hbase.security.access.GetUserPermissionsRequest;
092import org.apache.hadoop.hbase.security.access.Permission;
093import org.apache.hadoop.hbase.security.access.ShadedAccessControlUtil;
094import org.apache.hadoop.hbase.security.access.UserPermission;
095import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
096import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
097import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
098import org.apache.hadoop.hbase.util.Bytes;
099import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
100import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
101import org.apache.hadoop.hbase.util.Pair;
102import org.apache.hadoop.hbase.util.Strings;
103import org.apache.yetus.audience.InterfaceAudience;
104import org.slf4j.Logger;
105import org.slf4j.LoggerFactory;
106
107import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
108import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
109import org.apache.hbase.thirdparty.com.google.protobuf.Message;
110import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
111import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
112import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
113import org.apache.hbase.thirdparty.io.netty.util.Timeout;
114import org.apache.hbase.thirdparty.io.netty.util.TimerTask;
115import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
116
117import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
118import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
119import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos;
120import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse;
121import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantRequest;
122import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantResponse;
123import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsRequest;
124import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsResponse;
125import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeRequest;
126import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeResponse;
127import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
128import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
129import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
130import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
131import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
132import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
133import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
134import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
135import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
136import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
137import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
138import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListRequest;
139import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListResponse;
140import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
141import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
142import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
143import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
144import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
145import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
146import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
147import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
148import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
149import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
150import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
151import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
152import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
153import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.LastHighestWalFilenum;
154import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
155import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
156import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
157import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
158import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
159import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest;
160import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse;
161import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
162import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse;
163import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest;
164import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse;
165import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest;
166import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersResponse;
167import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest;
168import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse;
169import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
170import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse;
171import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest;
172import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse;
173import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest;
174import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse;
175import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
176import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
177import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
178import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
179import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest;
180import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
181import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
182import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse;
183import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
184import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
185import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
186import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse;
187import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest;
188import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
189import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest;
190import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreResponse;
191import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableRequest;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableResponse;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest;
198import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse;
199import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
200import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
201import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
202import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
203import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest;
204import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresResponse;
205import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledRequest;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledResponse;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotCleanupEnabledResponse;
224import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest;
227import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse;
228import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
229import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse;
230import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
231import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
232import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest;
233import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesResponse;
234import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
235import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
236import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByStateRequest;
237import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByStateResponse;
238import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
239import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse;
240import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByStateRequest;
241import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByStateResponse;
242import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
243import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest;
244import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse;
245import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
246import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest;
247import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse;
248import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest;
249import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse;
250import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerRequest;
251import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerResponse;
252import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
253import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
254import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest;
255import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse;
256import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerRequest;
257import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerResponse;
258import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest;
259import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse;
260import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest;
261import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse;
262import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest;
263import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse;
264import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest;
265import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse;
266import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
267import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
268import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RollAllWALWritersRequest;
269import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RollAllWALWritersResponse;
270import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest;
271import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse;
272import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest;
273import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreResponse;
274import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
275import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse;
276import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
277import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
278import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest;
279import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningResponse;
280import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
281import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
282import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
283import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
284import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSnapshotCleanupResponse;
285import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest;
286import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse;
287import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest;
288import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse;
289import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest;
290import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse;
291import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest;
292import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse;
293import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest;
294import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse;
295import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest;
296import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse;
297import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest;
298import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse;
299import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
300import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
301import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
302import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse;
303import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
304import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest;
305import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
306import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
307import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse;
308import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes;
309import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
310import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
311import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupRequest;
312import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupResponse;
313import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupRequest;
314import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupResponse;
315import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetConfiguredNamespacesAndTablesInRSGroupRequest;
316import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetConfiguredNamespacesAndTablesInRSGroupResponse;
317import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerRequest;
318import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerResponse;
319import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableRequest;
320import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableResponse;
321import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoRequest;
322import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoResponse;
323import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosRequest;
324import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosResponse;
325import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListTablesInRSGroupRequest;
326import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListTablesInRSGroupResponse;
327import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersRequest;
328import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersResponse;
329import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupRequest;
330import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupResponse;
331import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersRequest;
332import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersResponse;
333import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupRequest;
334import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupResponse;
335import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.UpdateRSGroupConfigRequest;
336import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.UpdateRSGroupConfigResponse;
337import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
338import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
339import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
340import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
341import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest;
342import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
343import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
344import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
345import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresRequest;
346import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresResponse;
347import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest;
348import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse;
349import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledRequest;
350import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledResponse;
351import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
352import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
353import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
354import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
355import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchRequest;
356import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchResponse;
357import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest;
358import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse;
359import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
360import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
361import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
362
363/**
364 * The implementation of AsyncAdmin.
365 * <p>
366 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will
367 * be finished inside the rpc framework thread, which means that the callbacks registered to the
368 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use
369 * this class should not try to do time consuming tasks in the callbacks.
370 * @since 2.0.0
371 * @see AsyncHBaseAdmin
372 * @see AsyncConnection#getAdmin()
373 * @see AsyncConnection#getAdminBuilder()
374 */
375@InterfaceAudience.Private
376class RawAsyncHBaseAdmin implements AsyncAdmin {
377
378  public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc";
379
380  private static final Logger LOG = LoggerFactory.getLogger(AsyncHBaseAdmin.class);
381
382  private final AsyncConnectionImpl connection;
383
384  private final HashedWheelTimer retryTimer;
385
386  private final AsyncTable<AdvancedScanResultConsumer> metaTable;
387
388  private final long rpcTimeoutNs;
389
390  private final long operationTimeoutNs;
391
392  private final long pauseNs;
393
394  private final long pauseNsForServerOverloaded;
395
396  private final int maxAttempts;
397
398  private final int startLogErrorsCnt;
399
400  private final NonceGenerator ng;
401
402  RawAsyncHBaseAdmin(AsyncConnectionImpl connection, HashedWheelTimer retryTimer,
403    AsyncAdminBuilderBase builder) {
404    this.connection = connection;
405    this.retryTimer = retryTimer;
406    this.metaTable = connection.getTable(META_TABLE_NAME);
407    this.rpcTimeoutNs = builder.rpcTimeoutNs;
408    this.operationTimeoutNs = builder.operationTimeoutNs;
409    this.pauseNs = builder.pauseNs;
410    if (builder.pauseNsForServerOverloaded < builder.pauseNs) {
411      LOG.warn(
412        "Configured value of pauseNsForServerOverloaded is {} ms, which is less than"
413          + " the normal pause value {} ms, use the greater one instead",
414        TimeUnit.NANOSECONDS.toMillis(builder.pauseNsForServerOverloaded),
415        TimeUnit.NANOSECONDS.toMillis(builder.pauseNs));
416      this.pauseNsForServerOverloaded = builder.pauseNs;
417    } else {
418      this.pauseNsForServerOverloaded = builder.pauseNsForServerOverloaded;
419    }
420    this.maxAttempts = builder.maxAttempts;
421    this.startLogErrorsCnt = builder.startLogErrorsCnt;
422    this.ng = connection.getNonceGenerator();
423  }
424
425  <T> MasterRequestCallerBuilder<T> newMasterCaller() {
426    return this.connection.callerFactory.<T> masterRequest()
427      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
428      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
429      .pause(pauseNs, TimeUnit.NANOSECONDS)
430      .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
431      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
432  }
433
434  private <T> AdminRequestCallerBuilder<T> newAdminCaller() {
435    return this.connection.callerFactory.<T> adminRequest()
436      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
437      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
438      .pause(pauseNs, TimeUnit.NANOSECONDS)
439      .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
440      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
441  }
442
443  @FunctionalInterface
444  private interface MasterRpcCall<RESP, REQ> {
445    void call(MasterService.Interface stub, HBaseRpcController controller, REQ req,
446      RpcCallback<RESP> done);
447  }
448
449  @FunctionalInterface
450  private interface AdminRpcCall<RESP, REQ> {
451    void call(AdminService.Interface stub, HBaseRpcController controller, REQ req,
452      RpcCallback<RESP> done);
453  }
454
455  @FunctionalInterface
456  private interface Converter<D, S> {
457    D convert(S src) throws IOException;
458  }
459
460  private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller,
461    MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall,
462    Converter<RESP, PRESP> respConverter) {
463    CompletableFuture<RESP> future = new CompletableFuture<>();
464    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
465
466      @Override
467      public void run(PRESP resp) {
468        if (controller.failed()) {
469          future.completeExceptionally(controller.getFailed());
470        } else {
471          try {
472            future.complete(respConverter.convert(resp));
473          } catch (IOException e) {
474            future.completeExceptionally(e);
475          }
476        }
477      }
478    });
479    return future;
480  }
481
482  private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller,
483    AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall,
484    Converter<RESP, PRESP> respConverter) {
485    CompletableFuture<RESP> future = new CompletableFuture<>();
486    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
487
488      @Override
489      public void run(PRESP resp) {
490        if (controller.failed()) {
491          future.completeExceptionally(controller.getFailed());
492        } else {
493          try {
494            future.complete(respConverter.convert(resp));
495          } catch (IOException e) {
496            future.completeExceptionally(e);
497          }
498        }
499      }
500    });
501    return future;
502  }
503
504  /**
505   * short-circuit call for
506   * {@link RawAsyncHBaseAdmin#procedureCall(Object, MasterRpcCall, Converter, Converter, ProcedureBiConsumer)}
507   * by ignoring procedure result
508   */
509  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq,
510    MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
511    ProcedureBiConsumer<Void> consumer) {
512    return procedureCall(preq, rpcCall, respConverter, result -> null, consumer);
513  }
514
515  /**
516   * short-circuit call for procedureCall(Consumer, Object, MasterRpcCall, Converter, Converter,
517   * ProcedureBiConsumer) by skip setting priority for request
518   */
519  private <PREQ, PRESP, PRES> CompletableFuture<PRES> procedureCall(PREQ preq,
520    MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
521    Converter<PRES, ByteString> resultConverter, ProcedureBiConsumer<PRES> consumer) {
522    return procedureCall(b -> {
523    }, preq, rpcCall, respConverter, resultConverter, consumer);
524  }
525
526  /**
527   * short-circuit call for procedureCall(TableName, Object, MasterRpcCall, Converter, Converter,
528   * ProcedureBiConsumer) by ignoring procedure result
529   */
530  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(TableName tableName, PREQ preq,
531    MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
532    ProcedureBiConsumer<Void> consumer) {
533    return procedureCall(tableName, preq, rpcCall, respConverter, result -> null, consumer);
534  }
535
536  /**
537   * short-circuit call for procedureCall(Consumer, Object, MasterRpcCall, Converter, Converter,
538   * ProcedureBiConsumer) by skip setting priority for request
539   */
540  private <PREQ, PRESP, PRES> CompletableFuture<PRES> procedureCall(TableName tableName, PREQ preq,
541    MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
542    Converter<PRES, ByteString> resultConverter, ProcedureBiConsumer<PRES> consumer) {
543    return procedureCall(b -> b.priority(tableName), preq, rpcCall, respConverter, resultConverter,
544      consumer);
545  }
546
547  /**
548   * @param <PREQ>          type of request
549   * @param <PRESP>         type of response
550   * @param <PRES>          type of procedure call result
551   * @param prioritySetter  prioritySetter set priority by table for request
552   * @param preq            procedure call request
553   * @param rpcCall         procedure rpc call
554   * @param respConverter   extract proc id from procedure call response
555   * @param resultConverter extract result from procedure call result
556   * @param consumer        action performs on result
557   * @return procedure call result, null if procedure is void
558   */
559  private <PREQ, PRESP, PRES> CompletableFuture<PRES> procedureCall(
560    Consumer<MasterRequestCallerBuilder<?>> prioritySetter, PREQ preq,
561    MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
562    Converter<PRES, ByteString> resultConverter, ProcedureBiConsumer<PRES> consumer) {
563    MasterRequestCallerBuilder<Long> builder = this.<Long> newMasterCaller()
564      .action((controller, stub) -> this.call(controller, stub, preq, rpcCall, respConverter));
565    prioritySetter.accept(builder);
566    CompletableFuture<Long> procFuture = builder.call();
567    CompletableFuture<PRES> future = waitProcedureResult(procFuture, resultConverter);
568    addListener(future, consumer);
569    return future;
570  }
571
572  @Override
573  public CompletableFuture<Boolean> tableExists(TableName tableName) {
574    if (TableName.isMetaTableName(tableName)) {
575      return CompletableFuture.completedFuture(true);
576    }
577    return ClientMetaTableAccessor.tableExists(metaTable, tableName);
578  }
579
580  @Override
581  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(boolean includeSysTables) {
582    return getTableDescriptors(
583      RequestConverter.buildGetTableDescriptorsRequest(null, includeSysTables));
584  }
585
586  /**
587   * {@link #listTableDescriptors(boolean)}
588   */
589  @Override
590  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(Pattern pattern,
591    boolean includeSysTables) {
592    Preconditions.checkNotNull(pattern, "pattern is null. If you don't specify a pattern, "
593      + "use listTableDescriptors(boolean) instead");
594    return getTableDescriptors(
595      RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables));
596  }
597
598  @Override
599  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(List<TableName> tableNames) {
600    Preconditions.checkNotNull(tableNames, "tableNames is null. If you don't specify tableNames, "
601      + "use listTableDescriptors(boolean) instead");
602    if (tableNames.isEmpty()) {
603      return CompletableFuture.completedFuture(Collections.emptyList());
604    }
605    return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(tableNames));
606  }
607
608  private CompletableFuture<List<TableDescriptor>>
609    getTableDescriptors(GetTableDescriptorsRequest request) {
610    return this.<List<TableDescriptor>> newMasterCaller()
611      .action((controller, stub) -> this.<GetTableDescriptorsRequest, GetTableDescriptorsResponse,
612        List<TableDescriptor>> call(controller, stub, request,
613          (s, c, req, done) -> s.getTableDescriptors(c, req, done),
614          (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
615      .call();
616  }
617
618  @Override
619  public CompletableFuture<List<TableName>> listTableNames(boolean includeSysTables) {
620    return getTableNames(RequestConverter.buildGetTableNamesRequest(null, includeSysTables));
621  }
622
623  @Override
624  public CompletableFuture<List<TableName>> listTableNames(Pattern pattern,
625    boolean includeSysTables) {
626    Preconditions.checkNotNull(pattern,
627      "pattern is null. If you don't specify a pattern, use listTableNames(boolean) instead");
628    return getTableNames(RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables));
629  }
630
631  @Override
632  public CompletableFuture<List<TableName>> listTableNamesByState(boolean isEnabled) {
633    return this.<List<TableName>> newMasterCaller()
634      .action((controller, stub) -> this.<ListTableNamesByStateRequest,
635        ListTableNamesByStateResponse, List<TableName>> call(controller, stub,
636          ListTableNamesByStateRequest.newBuilder().setIsEnabled(isEnabled).build(),
637          (s, c, req, done) -> s.listTableNamesByState(c, req, done),
638          (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList())))
639      .call();
640  }
641
642  private CompletableFuture<List<TableName>> getTableNames(GetTableNamesRequest request) {
643    return this.<List<TableName>> newMasterCaller()
644      .action((controller, stub) -> this.<GetTableNamesRequest, GetTableNamesResponse,
645        List<TableName>> call(controller, stub, request,
646          (s, c, req, done) -> s.getTableNames(c, req, done),
647          (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList())))
648      .call();
649  }
650
651  @Override
652  public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByNamespace(String name) {
653    return this.<List<TableDescriptor>> newMasterCaller()
654      .action((controller, stub) -> this.<ListTableDescriptorsByNamespaceRequest,
655        ListTableDescriptorsByNamespaceResponse, List<TableDescriptor>> call(controller, stub,
656          ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
657          (s, c, req, done) -> s.listTableDescriptorsByNamespace(c, req, done),
658          (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
659      .call();
660  }
661
662  @Override
663  public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByState(boolean isEnabled) {
664    return this.<List<TableDescriptor>> newMasterCaller()
665      .action((controller, stub) -> this.<ListTableDescriptorsByStateRequest,
666        ListTableDescriptorsByStateResponse, List<TableDescriptor>> call(controller, stub,
667          ListTableDescriptorsByStateRequest.newBuilder().setIsEnabled(isEnabled).build(),
668          (s, c, req, done) -> s.listTableDescriptorsByState(c, req, done),
669          (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
670      .call();
671  }
672
673  @Override
674  public CompletableFuture<List<TableName>> listTableNamesByNamespace(String name) {
675    return this.<List<TableName>> newMasterCaller()
676      .action((controller, stub) -> this.<ListTableNamesByNamespaceRequest,
677        ListTableNamesByNamespaceResponse, List<TableName>> call(controller, stub,
678          ListTableNamesByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
679          (s, c, req, done) -> s.listTableNamesByNamespace(c, req, done),
680          (resp) -> ProtobufUtil.toTableNameList(resp.getTableNameList())))
681      .call();
682  }
683
684  @Override
685  public CompletableFuture<TableDescriptor> getDescriptor(TableName tableName) {
686    CompletableFuture<TableDescriptor> future = new CompletableFuture<>();
687    addListener(this.<List<TableSchema>> newMasterCaller().priority(tableName)
688      .action((controller, stub) -> this.<GetTableDescriptorsRequest, GetTableDescriptorsResponse,
689        List<TableSchema>> call(controller, stub,
690          RequestConverter.buildGetTableDescriptorsRequest(tableName),
691          (s, c, req, done) -> s.getTableDescriptors(c, req, done),
692          (resp) -> resp.getTableSchemaList()))
693      .call(), (tableSchemas, error) -> {
694        if (error != null) {
695          future.completeExceptionally(error);
696          return;
697        }
698        if (!tableSchemas.isEmpty()) {
699          future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0)));
700        } else {
701          future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString()));
702        }
703      });
704    return future;
705  }
706
707  @Override
708  public CompletableFuture<Void> createTable(TableDescriptor desc) {
709    return createTable(desc.getTableName(),
710      RequestConverter.buildCreateTableRequest(desc, null, ng.getNonceGroup(), ng.newNonce()));
711  }
712
713  @Override
714  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[] startKey, byte[] endKey,
715    int numRegions) {
716    try {
717      return createTable(desc, getSplitKeys(startKey, endKey, numRegions));
718    } catch (IllegalArgumentException e) {
719      return failedFuture(e);
720    }
721  }
722
723  @Override
724  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys) {
725    Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys,"
726      + " use createTable(TableDescriptor) instead");
727    try {
728      verifySplitKeys(splitKeys);
729      return createTable(desc.getTableName(), RequestConverter.buildCreateTableRequest(desc,
730        splitKeys, ng.getNonceGroup(), ng.newNonce()));
731    } catch (IllegalArgumentException e) {
732      return failedFuture(e);
733    }
734  }
735
736  private CompletableFuture<Void> createTable(TableName tableName, CreateTableRequest request) {
737    Preconditions.checkNotNull(tableName, "table name is null");
738    return this.<CreateTableRequest, CreateTableResponse> procedureCall(tableName, request,
739      (s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(),
740      new CreateTableProcedureBiConsumer(tableName));
741  }
742
743  @Override
744  public CompletableFuture<Void> modifyTable(TableDescriptor desc) {
745    return modifyTable(desc, true);
746  }
747
748  @Override
749  public CompletableFuture<Void> modifyTable(TableDescriptor desc, boolean reopenRegions) {
750    return this.<ModifyTableRequest, ModifyTableResponse> procedureCall(desc.getTableName(),
751      RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(),
752        ng.newNonce(), reopenRegions),
753      (s, c, req, done) -> s.modifyTable(c, req, done), (resp) -> resp.getProcId(),
754      new ModifyTableProcedureBiConsumer(this, desc.getTableName()));
755  }
756
757  @Override
758  public CompletableFuture<Void> modifyTableStoreFileTracker(TableName tableName, String dstSFT) {
759    return this.<ModifyTableStoreFileTrackerRequest,
760      ModifyTableStoreFileTrackerResponse> procedureCall(tableName,
761        RequestConverter.buildModifyTableStoreFileTrackerRequest(tableName, dstSFT,
762          ng.getNonceGroup(), ng.newNonce()),
763        (s, c, req, done) -> s.modifyTableStoreFileTracker(c, req, done),
764        (resp) -> resp.getProcId(),
765        new ModifyTableStoreFileTrackerProcedureBiConsumer(this, tableName));
766  }
767
768  @Override
769  public CompletableFuture<Void> deleteTable(TableName tableName) {
770    return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(tableName,
771      RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
772      (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(),
773      new DeleteTableProcedureBiConsumer(tableName));
774  }
775
776  @Override
777  public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) {
778    return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(tableName,
779      RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(),
780        ng.newNonce()),
781      (s, c, req, done) -> s.truncateTable(c, req, done), (resp) -> resp.getProcId(),
782      new TruncateTableProcedureBiConsumer(tableName));
783  }
784
785  @Override
786  public CompletableFuture<Void> enableTable(TableName tableName) {
787    return this.<EnableTableRequest, EnableTableResponse> procedureCall(tableName,
788      RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
789      (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(),
790      new EnableTableProcedureBiConsumer(tableName));
791  }
792
793  @Override
794  public CompletableFuture<Void> disableTable(TableName tableName) {
795    return this.<DisableTableRequest, DisableTableResponse> procedureCall(tableName,
796      RequestConverter.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
797      (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(),
798      new DisableTableProcedureBiConsumer(tableName));
799  }
800
801  /**
802   * Utility for completing passed TableState {@link CompletableFuture} <code>future</code> using
803   * passed parameters. Sets error or boolean result ('true' if table matches the passed-in
804   * targetState).
805   */
806  private static CompletableFuture<Boolean> completeCheckTableState(
807    CompletableFuture<Boolean> future, Optional<TableState> tableState, Throwable error,
808    TableState.State targetState, TableName tableName) {
809    if (error != null) {
810      future.completeExceptionally(error);
811    } else {
812      if (tableState.isPresent()) {
813        future.complete(tableState.get().inStates(targetState));
814      } else {
815        future.completeExceptionally(new TableNotFoundException(tableName));
816      }
817    }
818    return future;
819  }
820
821  @Override
822  public CompletableFuture<Boolean> isTableEnabled(TableName tableName) {
823    if (TableName.isMetaTableName(tableName)) {
824      return CompletableFuture.completedFuture(true);
825    }
826    CompletableFuture<Boolean> future = new CompletableFuture<>();
827    addListener(ClientMetaTableAccessor.getTableState(metaTable, tableName),
828      (tableState, error) -> {
829        completeCheckTableState(future, tableState, error, TableState.State.ENABLED, tableName);
830      });
831    return future;
832  }
833
834  @Override
835  public CompletableFuture<Boolean> isTableDisabled(TableName tableName) {
836    if (TableName.isMetaTableName(tableName)) {
837      return CompletableFuture.completedFuture(false);
838    }
839    CompletableFuture<Boolean> future = new CompletableFuture<>();
840    addListener(ClientMetaTableAccessor.getTableState(metaTable, tableName),
841      (tableState, error) -> {
842        completeCheckTableState(future, tableState, error, TableState.State.DISABLED, tableName);
843      });
844    return future;
845  }
846
847  @Override
848  public CompletableFuture<Boolean> isTableAvailable(TableName tableName) {
849    if (TableName.isMetaTableName(tableName)) {
850      return connection.registry.getMetaRegionLocations().thenApply(locs -> Stream
851        .of(locs.getRegionLocations()).allMatch(loc -> loc != null && loc.getServerName() != null));
852    }
853    CompletableFuture<Boolean> future = new CompletableFuture<>();
854    addListener(isTableEnabled(tableName), (enabled, error) -> {
855      if (error != null) {
856        if (error instanceof TableNotFoundException) {
857          future.complete(false);
858        } else {
859          future.completeExceptionally(error);
860        }
861        return;
862      }
863      if (!enabled) {
864        future.complete(false);
865      } else {
866        addListener(ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName),
867          (locations, error1) -> {
868            if (error1 != null) {
869              future.completeExceptionally(error1);
870              return;
871            }
872            List<HRegionLocation> notDeployedRegions = locations.stream()
873              .filter(loc -> loc.getServerName() == null).collect(Collectors.toList());
874            if (notDeployedRegions.size() > 0) {
875              if (LOG.isDebugEnabled()) {
876                LOG.debug("Table " + tableName + " has " + notDeployedRegions.size() + " regions");
877              }
878              future.complete(false);
879              return;
880            }
881            future.complete(true);
882          });
883      }
884    });
885    return future;
886  }
887
888  @Override
889  public CompletableFuture<Void> addColumnFamily(TableName tableName,
890    ColumnFamilyDescriptor columnFamily) {
891    return this.<AddColumnRequest, AddColumnResponse> procedureCall(tableName,
892      RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
893        ng.newNonce()),
894      (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(),
895      new AddColumnFamilyProcedureBiConsumer(tableName));
896  }
897
898  @Override
899  public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) {
900    return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(tableName,
901      RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
902        ng.newNonce()),
903      (s, c, req, done) -> s.deleteColumn(c, req, done), (resp) -> resp.getProcId(),
904      new DeleteColumnFamilyProcedureBiConsumer(tableName));
905  }
906
907  @Override
908  public CompletableFuture<Void> modifyColumnFamily(TableName tableName,
909    ColumnFamilyDescriptor columnFamily) {
910    return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(tableName,
911      RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
912        ng.newNonce()),
913      (s, c, req, done) -> s.modifyColumn(c, req, done), (resp) -> resp.getProcId(),
914      new ModifyColumnFamilyProcedureBiConsumer(tableName));
915  }
916
917  @Override
918  public CompletableFuture<Void> modifyColumnFamilyStoreFileTracker(TableName tableName,
919    byte[] family, String dstSFT) {
920    return this.<ModifyColumnStoreFileTrackerRequest,
921      ModifyColumnStoreFileTrackerResponse> procedureCall(tableName,
922        RequestConverter.buildModifyColumnStoreFileTrackerRequest(tableName, family, dstSFT,
923          ng.getNonceGroup(), ng.newNonce()),
924        (s, c, req, done) -> s.modifyColumnStoreFileTracker(c, req, done),
925        (resp) -> resp.getProcId(),
926        new ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer(tableName));
927  }
928
929  @Override
930  public CompletableFuture<Void> createNamespace(NamespaceDescriptor descriptor) {
931    return this.<CreateNamespaceRequest, CreateNamespaceResponse> procedureCall(
932      RequestConverter.buildCreateNamespaceRequest(descriptor),
933      (s, c, req, done) -> s.createNamespace(c, req, done), (resp) -> resp.getProcId(),
934      new CreateNamespaceProcedureBiConsumer(descriptor.getName()));
935  }
936
937  @Override
938  public CompletableFuture<Void> modifyNamespace(NamespaceDescriptor descriptor) {
939    return this.<ModifyNamespaceRequest, ModifyNamespaceResponse> procedureCall(
940      RequestConverter.buildModifyNamespaceRequest(descriptor),
941      (s, c, req, done) -> s.modifyNamespace(c, req, done), (resp) -> resp.getProcId(),
942      new ModifyNamespaceProcedureBiConsumer(descriptor.getName()));
943  }
944
945  @Override
946  public CompletableFuture<Void> deleteNamespace(String name) {
947    return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall(
948      RequestConverter.buildDeleteNamespaceRequest(name),
949      (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(),
950      new DeleteNamespaceProcedureBiConsumer(name));
951  }
952
953  @Override
954  public CompletableFuture<NamespaceDescriptor> getNamespaceDescriptor(String name) {
955    return this.<NamespaceDescriptor> newMasterCaller()
956      .action((controller, stub) -> this.<GetNamespaceDescriptorRequest,
957        GetNamespaceDescriptorResponse, NamespaceDescriptor> call(controller, stub,
958          RequestConverter.buildGetNamespaceDescriptorRequest(name),
959          (s, c, req, done) -> s.getNamespaceDescriptor(c, req, done),
960          (resp) -> ProtobufUtil.toNamespaceDescriptor(resp.getNamespaceDescriptor())))
961      .call();
962  }
963
964  @Override
965  public CompletableFuture<List<String>> listNamespaces() {
966    return this.<List<String>> newMasterCaller()
967      .action((controller, stub) -> this.<ListNamespacesRequest, ListNamespacesResponse,
968        List<String>> call(controller, stub, ListNamespacesRequest.newBuilder().build(),
969          (s, c, req, done) -> s.listNamespaces(c, req, done),
970          (resp) -> resp.getNamespaceNameList()))
971      .call();
972  }
973
974  @Override
975  public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() {
976    return this.<List<NamespaceDescriptor>> newMasterCaller()
977      .action((controller, stub) -> this.<ListNamespaceDescriptorsRequest,
978        ListNamespaceDescriptorsResponse, List<NamespaceDescriptor>> call(controller, stub,
979          ListNamespaceDescriptorsRequest.newBuilder().build(),
980          (s, c, req, done) -> s.listNamespaceDescriptors(c, req, done),
981          (resp) -> ProtobufUtil.toNamespaceDescriptorList(resp)))
982      .call();
983  }
984
985  @Override
986  public CompletableFuture<List<RegionInfo>> getRegions(ServerName serverName) {
987    return this.<List<RegionInfo>> newAdminCaller()
988      .action((controller, stub) -> this.<GetOnlineRegionRequest, GetOnlineRegionResponse,
989        List<RegionInfo>> adminCall(controller, stub,
990          RequestConverter.buildGetOnlineRegionRequest(),
991          (s, c, req, done) -> s.getOnlineRegion(c, req, done),
992          resp -> ProtobufUtil.getRegionInfos(resp)))
993      .serverName(serverName).call();
994  }
995
996  @Override
997  public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) {
998    if (tableName.equals(META_TABLE_NAME)) {
999      return connection.registry.getMetaRegionLocations()
1000        .thenApply(locs -> Stream.of(locs.getRegionLocations()).map(HRegionLocation::getRegion)
1001          .collect(Collectors.toList()));
1002    } else {
1003      return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName).thenApply(
1004        locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList()));
1005    }
1006  }
1007
1008  @Override
1009  public CompletableFuture<Void> flush(TableName tableName) {
1010    return flush(tableName, Collections.emptyList());
1011  }
1012
1013  @Override
1014  public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) {
1015    Preconditions.checkNotNull(columnFamily,
1016      "columnFamily is null, If you don't specify a columnFamily, use flush(TableName) instead.");
1017    return flush(tableName, Collections.singletonList(columnFamily));
1018  }
1019
1020  @Override
1021  public CompletableFuture<Void> flush(TableName tableName, List<byte[]> columnFamilyList) {
1022    // This is for keeping compatibility with old implementation.
1023    // If the server version is lower than the client version, it's possible that the
1024    // flushTable method is not present in the server side, if so, we need to fall back
1025    // to the old implementation.
1026    Preconditions.checkNotNull(columnFamilyList,
1027      "columnFamily is null, If you don't specify a columnFamily, use flush(TableName) instead.");
1028    List<byte[]> columnFamilies = columnFamilyList.stream()
1029      .filter(cf -> cf != null && cf.length > 0).distinct().collect(Collectors.toList());
1030    FlushTableRequest request = RequestConverter.buildFlushTableRequest(tableName, columnFamilies,
1031      ng.getNonceGroup(), ng.newNonce());
1032    CompletableFuture<Void> procFuture = this.<FlushTableRequest, FlushTableResponse> procedureCall(
1033      tableName, request, (s, c, req, done) -> s.flushTable(c, req, done),
1034      (resp) -> resp.getProcId(), new FlushTableProcedureBiConsumer(tableName));
1035    CompletableFuture<Void> future = new CompletableFuture<>();
1036    addListener(procFuture, (ret, error) -> {
1037      if (error != null) {
1038        if (
1039          error instanceof TableNotFoundException || error instanceof TableNotEnabledException
1040            || error instanceof NoSuchColumnFamilyException
1041        ) {
1042          future.completeExceptionally(error);
1043        } else if (error instanceof DoNotRetryIOException) {
1044          // usually this is caused by the method is not present on the server or
1045          // the hbase hadoop version does not match the running hadoop version.
1046          // if that happens, we need fall back to the old flush implementation.
1047          LOG.info("Unrecoverable error in master side. Fallback to FlushTableProcedure V1", error);
1048          legacyFlush(future, tableName, columnFamilies);
1049        } else {
1050          future.completeExceptionally(error);
1051        }
1052      } else {
1053        future.complete(ret);
1054      }
1055    });
1056    return future;
1057  }
1058
1059  private void legacyFlush(CompletableFuture<Void> future, TableName tableName,
1060    List<byte[]> columnFamilies) {
1061    addListener(tableExists(tableName), (exists, err) -> {
1062      if (err != null) {
1063        future.completeExceptionally(err);
1064      } else if (!exists) {
1065        future.completeExceptionally(new TableNotFoundException(tableName));
1066      } else {
1067        addListener(isTableEnabled(tableName), (tableEnabled, err2) -> {
1068          if (err2 != null) {
1069            future.completeExceptionally(err2);
1070          } else if (!tableEnabled) {
1071            future.completeExceptionally(new TableNotEnabledException(tableName));
1072          } else {
1073            Map<String, String> props = new HashMap<>();
1074            if (columnFamilies != null && !columnFamilies.isEmpty()) {
1075              props.put(HConstants.FAMILY_KEY_STR, Strings.JOINER
1076                .join(columnFamilies.stream().map(Bytes::toString).collect(Collectors.toList())));
1077            }
1078            addListener(
1079              execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(), props),
1080              (ret, err3) -> {
1081                if (err3 != null) {
1082                  future.completeExceptionally(err3);
1083                } else {
1084                  future.complete(ret);
1085                }
1086              });
1087          }
1088        });
1089      }
1090    });
1091  }
1092
1093  @Override
1094  public CompletableFuture<Void> flushRegion(byte[] regionName) {
1095    return flushRegionInternal(regionName, null, false).thenAccept(r -> {
1096    });
1097  }
1098
1099  @Override
1100  public CompletableFuture<Void> flushRegion(byte[] regionName, byte[] columnFamily) {
1101    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1102      + "If you don't specify a columnFamily, use flushRegion(regionName) instead");
1103    return flushRegionInternal(regionName, columnFamily, false).thenAccept(r -> {
1104    });
1105  }
1106
1107  /**
1108   * This method is for internal use only, where we need the response of the flush.
1109   * <p/>
1110   * As it exposes the protobuf message, please do <strong>NOT</strong> try to expose it as a public
1111   * API.
1112   */
1113  CompletableFuture<FlushRegionResponse> flushRegionInternal(byte[] regionName, byte[] columnFamily,
1114    boolean writeFlushWALMarker) {
1115    CompletableFuture<FlushRegionResponse> future = new CompletableFuture<>();
1116    addListener(getRegionLocation(regionName), (location, err) -> {
1117      if (err != null) {
1118        future.completeExceptionally(err);
1119        return;
1120      }
1121      ServerName serverName = location.getServerName();
1122      if (serverName == null) {
1123        future
1124          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1125        return;
1126      }
1127      addListener(flush(serverName, location.getRegion(), columnFamily, writeFlushWALMarker),
1128        (ret, err2) -> {
1129          if (err2 != null) {
1130            future.completeExceptionally(err2);
1131          } else {
1132            future.complete(ret);
1133          }
1134        });
1135    });
1136    return future;
1137  }
1138
1139  private CompletableFuture<FlushRegionResponse> flush(ServerName serverName, RegionInfo regionInfo,
1140    byte[] columnFamily, boolean writeFlushWALMarker) {
1141    return this.<FlushRegionResponse> newAdminCaller().serverName(serverName)
1142      .action((controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse,
1143        FlushRegionResponse> adminCall(controller, stub,
1144          RequestConverter.buildFlushRegionRequest(regionInfo.getRegionName(), columnFamily,
1145            writeFlushWALMarker),
1146          (s, c, req, done) -> s.flushRegion(c, req, done), resp -> resp))
1147      .call();
1148  }
1149
1150  @Override
1151  public CompletableFuture<Void> flushRegionServer(ServerName sn) {
1152    CompletableFuture<Void> future = new CompletableFuture<>();
1153    addListener(getRegions(sn), (hRegionInfos, err) -> {
1154      if (err != null) {
1155        future.completeExceptionally(err);
1156        return;
1157      }
1158      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
1159      if (hRegionInfos != null) {
1160        hRegionInfos
1161          .forEach(region -> compactFutures.add(flush(sn, region, null, false).thenAccept(r -> {
1162          })));
1163      }
1164      addListener(CompletableFuture.allOf(
1165        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
1166          if (err2 != null) {
1167            future.completeExceptionally(err2);
1168          } else {
1169            future.complete(ret);
1170          }
1171        });
1172    });
1173    return future;
1174  }
1175
1176  @Override
1177  public CompletableFuture<Void> compact(TableName tableName, CompactType compactType) {
1178    return compact(tableName, null, false, compactType);
1179  }
1180
1181  @Override
1182  public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily,
1183    CompactType compactType) {
1184    Preconditions.checkNotNull(columnFamily, "columnFamily is null. "
1185      + "If you don't specify a columnFamily, use compact(TableName) instead");
1186    return compact(tableName, columnFamily, false, compactType);
1187  }
1188
1189  @Override
1190  public CompletableFuture<Void> compactRegion(byte[] regionName) {
1191    return compactRegion(regionName, null, false);
1192  }
1193
1194  @Override
1195  public CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily) {
1196    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1197      + " If you don't specify a columnFamily, use compactRegion(regionName) instead");
1198    return compactRegion(regionName, columnFamily, false);
1199  }
1200
1201  @Override
1202  public CompletableFuture<Void> majorCompact(TableName tableName, CompactType compactType) {
1203    return compact(tableName, null, true, compactType);
1204  }
1205
1206  @Override
1207  public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily,
1208    CompactType compactType) {
1209    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1210      + "If you don't specify a columnFamily, use compact(TableName) instead");
1211    return compact(tableName, columnFamily, true, compactType);
1212  }
1213
1214  @Override
1215  public CompletableFuture<Void> majorCompactRegion(byte[] regionName) {
1216    return compactRegion(regionName, null, true);
1217  }
1218
1219  @Override
1220  public CompletableFuture<Void> majorCompactRegion(byte[] regionName, byte[] columnFamily) {
1221    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1222      + " If you don't specify a columnFamily, use majorCompactRegion(regionName) instead");
1223    return compactRegion(regionName, columnFamily, true);
1224  }
1225
1226  @Override
1227  public CompletableFuture<Void> compactRegionServer(ServerName sn) {
1228    return compactRegionServer(sn, false);
1229  }
1230
1231  @Override
1232  public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) {
1233    return compactRegionServer(sn, true);
1234  }
1235
1236  private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) {
1237    CompletableFuture<Void> future = new CompletableFuture<>();
1238    addListener(getRegions(sn), (hRegionInfos, err) -> {
1239      if (err != null) {
1240        future.completeExceptionally(err);
1241        return;
1242      }
1243      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
1244      if (hRegionInfos != null) {
1245        hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, null)));
1246      }
1247      addListener(CompletableFuture.allOf(
1248        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
1249          if (err2 != null) {
1250            future.completeExceptionally(err2);
1251          } else {
1252            future.complete(ret);
1253          }
1254        });
1255    });
1256    return future;
1257  }
1258
1259  private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily,
1260    boolean major) {
1261    CompletableFuture<Void> future = new CompletableFuture<>();
1262    addListener(getRegionLocation(regionName), (location, err) -> {
1263      if (err != null) {
1264        future.completeExceptionally(err);
1265        return;
1266      }
1267      ServerName serverName = location.getServerName();
1268      if (serverName == null) {
1269        future
1270          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1271        return;
1272      }
1273      addListener(compact(location.getServerName(), location.getRegion(), major, columnFamily),
1274        (ret, err2) -> {
1275          if (err2 != null) {
1276            future.completeExceptionally(err2);
1277          } else {
1278            future.complete(ret);
1279          }
1280        });
1281    });
1282    return future;
1283  }
1284
1285  /**
1286   * List all region locations for the specific table.
1287   */
1288  private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) {
1289    if (TableName.META_TABLE_NAME.equals(tableName)) {
1290      CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
1291      addListener(connection.registry.getMetaRegionLocations(), (metaRegions, err) -> {
1292        if (err != null) {
1293          future.completeExceptionally(err);
1294        } else if (
1295          metaRegions == null || metaRegions.isEmpty()
1296            || metaRegions.getDefaultRegionLocation() == null
1297        ) {
1298          future.completeExceptionally(new IOException("meta region does not found"));
1299        } else {
1300          future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
1301        }
1302      });
1303      return future;
1304    } else {
1305      // For non-meta table, we fetch all locations by scanning hbase:meta table
1306      return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName);
1307    }
1308  }
1309
1310  /**
1311   * Compact column family of a table, Asynchronous operation even if CompletableFuture.get()
1312   */
1313  private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, boolean major,
1314    CompactType compactType) {
1315    CompletableFuture<Void> future = new CompletableFuture<>();
1316
1317    switch (compactType) {
1318      case MOB:
1319        addListener(connection.registry.getActiveMaster(), (serverName, err) -> {
1320          if (err != null) {
1321            future.completeExceptionally(err);
1322            return;
1323          }
1324          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
1325          addListener(compact(serverName, regionInfo, major, columnFamily), (ret, err2) -> {
1326            if (err2 != null) {
1327              future.completeExceptionally(err2);
1328            } else {
1329              future.complete(ret);
1330            }
1331          });
1332        });
1333        break;
1334      case NORMAL:
1335        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
1336          if (err != null) {
1337            future.completeExceptionally(err);
1338            return;
1339          }
1340          if (locations == null || locations.isEmpty()) {
1341            future.completeExceptionally(new TableNotFoundException(tableName));
1342          }
1343          CompletableFuture<?>[] compactFutures =
1344            locations.stream().filter(l -> l.getRegion() != null)
1345              .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
1346              .map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily))
1347              .toArray(CompletableFuture<?>[]::new);
1348          // future complete unless all of the compact futures are completed.
1349          addListener(CompletableFuture.allOf(compactFutures), (ret, err2) -> {
1350            if (err2 != null) {
1351              future.completeExceptionally(err2);
1352            } else {
1353              future.complete(ret);
1354            }
1355          });
1356        });
1357        break;
1358      default:
1359        throw new IllegalArgumentException("Unknown compactType: " + compactType);
1360    }
1361    return future;
1362  }
1363
1364  /**
1365   * Compact the region at specific region server.
1366   */
1367  private CompletableFuture<Void> compact(final ServerName sn, final RegionInfo hri,
1368    final boolean major, byte[] columnFamily) {
1369    return this.<Void> newAdminCaller().serverName(sn)
1370      .action((controller, stub) -> this.<CompactRegionRequest, CompactRegionResponse,
1371        Void> adminCall(controller, stub,
1372          RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, columnFamily),
1373          (s, c, req, done) -> s.compactRegion(c, req, done), resp -> null))
1374      .call();
1375  }
1376
1377  private byte[] toEncodeRegionName(byte[] regionName) {
1378    return RegionInfo.isEncodedRegionName(regionName)
1379      ? regionName
1380      : Bytes.toBytes(RegionInfo.encodeRegionName(regionName));
1381  }
1382
1383  private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName,
1384    CompletableFuture<TableName> result) {
1385    addListener(getRegionLocation(encodeRegionName), (location, err) -> {
1386      if (err != null) {
1387        result.completeExceptionally(err);
1388        return;
1389      }
1390      RegionInfo regionInfo = location.getRegion();
1391      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1392        result.completeExceptionally(
1393          new IllegalArgumentException("Can't invoke merge on non-default regions directly"));
1394        return;
1395      }
1396      if (!tableName.compareAndSet(null, regionInfo.getTable())) {
1397        if (!tableName.get().equals(regionInfo.getTable())) {
1398          // tables of this two region should be same.
1399          result.completeExceptionally(
1400            new IllegalArgumentException("Cannot merge regions from two different tables "
1401              + tableName.get() + " and " + regionInfo.getTable()));
1402        } else {
1403          result.complete(tableName.get());
1404        }
1405      }
1406    });
1407  }
1408
1409  private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[][] encodedRegionNames) {
1410    AtomicReference<TableName> tableNameRef = new AtomicReference<>();
1411    CompletableFuture<TableName> future = new CompletableFuture<>();
1412    for (byte[] encodedRegionName : encodedRegionNames) {
1413      checkAndGetTableName(encodedRegionName, tableNameRef, future);
1414    }
1415    return future;
1416  }
1417
1418  @Override
1419  public CompletableFuture<Boolean> mergeSwitch(boolean enabled, boolean drainMerges) {
1420    return setSplitOrMergeOn(enabled, drainMerges, MasterSwitchType.MERGE);
1421  }
1422
1423  @Override
1424  public CompletableFuture<Boolean> isMergeEnabled() {
1425    return isSplitOrMergeOn(MasterSwitchType.MERGE);
1426  }
1427
1428  @Override
1429  public CompletableFuture<Boolean> splitSwitch(boolean enabled, boolean drainSplits) {
1430    return setSplitOrMergeOn(enabled, drainSplits, MasterSwitchType.SPLIT);
1431  }
1432
1433  @Override
1434  public CompletableFuture<Boolean> isSplitEnabled() {
1435    return isSplitOrMergeOn(MasterSwitchType.SPLIT);
1436  }
1437
1438  private CompletableFuture<Boolean> setSplitOrMergeOn(boolean enabled, boolean synchronous,
1439    MasterSwitchType switchType) {
1440    SetSplitOrMergeEnabledRequest request =
1441      RequestConverter.buildSetSplitOrMergeEnabledRequest(enabled, synchronous, switchType);
1442    return this.<Boolean> newMasterCaller()
1443      .action((controller, stub) -> this.<SetSplitOrMergeEnabledRequest,
1444        SetSplitOrMergeEnabledResponse, Boolean> call(controller, stub, request,
1445          (s, c, req, done) -> s.setSplitOrMergeEnabled(c, req, done),
1446          (resp) -> resp.getPrevValueList().get(0)))
1447      .call();
1448  }
1449
1450  private CompletableFuture<Boolean> isSplitOrMergeOn(MasterSwitchType switchType) {
1451    IsSplitOrMergeEnabledRequest request =
1452      RequestConverter.buildIsSplitOrMergeEnabledRequest(switchType);
1453    return this.<Boolean> newMasterCaller()
1454      .action((controller, stub) -> this.<IsSplitOrMergeEnabledRequest,
1455        IsSplitOrMergeEnabledResponse, Boolean> call(controller, stub, request,
1456          (s, c, req, done) -> s.isSplitOrMergeEnabled(c, req, done), (resp) -> resp.getEnabled()))
1457      .call();
1458  }
1459
1460  @Override
1461  public CompletableFuture<Void> mergeRegions(List<byte[]> nameOfRegionsToMerge, boolean forcible) {
1462    if (nameOfRegionsToMerge.size() < 2) {
1463      return failedFuture(new IllegalArgumentException(
1464        "Can not merge only " + nameOfRegionsToMerge.size() + " region"));
1465    }
1466    CompletableFuture<Void> future = new CompletableFuture<>();
1467    byte[][] encodedNameOfRegionsToMerge =
1468      nameOfRegionsToMerge.stream().map(this::toEncodeRegionName).toArray(byte[][]::new);
1469
1470    addListener(checkRegionsAndGetTableName(encodedNameOfRegionsToMerge), (tableName, err) -> {
1471      if (err != null) {
1472        future.completeExceptionally(err);
1473        return;
1474      }
1475
1476      final MergeTableRegionsRequest request;
1477      try {
1478        request = RequestConverter.buildMergeTableRegionsRequest(encodedNameOfRegionsToMerge,
1479          forcible, ng.getNonceGroup(), ng.newNonce());
1480      } catch (DeserializationException e) {
1481        future.completeExceptionally(e);
1482        return;
1483      }
1484
1485      addListener(
1486        this.procedureCall(tableName, request, MasterService.Interface::mergeTableRegions,
1487          MergeTableRegionsResponse::getProcId, new MergeTableRegionProcedureBiConsumer(tableName)),
1488        (ret, err2) -> {
1489          if (err2 != null) {
1490            future.completeExceptionally(err2);
1491          } else {
1492            future.complete(ret);
1493          }
1494        });
1495    });
1496    return future;
1497  }
1498
1499  @Override
1500  public CompletableFuture<Void> split(TableName tableName) {
1501    CompletableFuture<Void> future = new CompletableFuture<>();
1502    addListener(tableExists(tableName), (exist, error) -> {
1503      if (error != null) {
1504        future.completeExceptionally(error);
1505        return;
1506      }
1507      if (!exist) {
1508        future.completeExceptionally(new TableNotFoundException(tableName));
1509        return;
1510      }
1511      addListener(metaTable
1512        .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)
1513          .withStartRow(ClientMetaTableAccessor.getTableStartRowForMeta(tableName,
1514            ClientMetaTableAccessor.QueryType.REGION))
1515          .withStopRow(ClientMetaTableAccessor.getTableStopRowForMeta(tableName,
1516            ClientMetaTableAccessor.QueryType.REGION))),
1517        (results, err2) -> {
1518          if (err2 != null) {
1519            future.completeExceptionally(err2);
1520            return;
1521          }
1522          if (results != null && !results.isEmpty()) {
1523            List<CompletableFuture<Void>> splitFutures = new ArrayList<>();
1524            for (Result r : results) {
1525              if (r.isEmpty() || CatalogFamilyFormat.getRegionInfo(r) == null) {
1526                continue;
1527              }
1528              RegionLocations rl = CatalogFamilyFormat.getRegionLocations(r);
1529              if (rl != null) {
1530                for (HRegionLocation h : rl.getRegionLocations()) {
1531                  if (h != null && h.getServerName() != null) {
1532                    RegionInfo hri = h.getRegion();
1533                    if (
1534                      hri == null || hri.isSplitParent()
1535                        || hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID
1536                    ) {
1537                      continue;
1538                    }
1539                    splitFutures.add(split(hri, null));
1540                  }
1541                }
1542              }
1543            }
1544            addListener(
1545              CompletableFuture
1546                .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()])),
1547              (ret, exception) -> {
1548                if (exception != null) {
1549                  future.completeExceptionally(exception);
1550                  return;
1551                }
1552                future.complete(ret);
1553              });
1554          } else {
1555            future.complete(null);
1556          }
1557        });
1558    });
1559    return future;
1560  }
1561
1562  @Override
1563  public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) {
1564    CompletableFuture<Void> result = new CompletableFuture<>();
1565    if (splitPoint == null) {
1566      return failedFuture(new IllegalArgumentException("splitPoint can not be null."));
1567    }
1568    addListener(connection.getRegionLocator(tableName).getRegionLocation(splitPoint, true),
1569      (loc, err) -> {
1570        if (err != null) {
1571          result.completeExceptionally(err);
1572        } else if (loc == null || loc.getRegion() == null) {
1573          result.completeExceptionally(new IllegalArgumentException(
1574            "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint)));
1575        } else {
1576          addListener(splitRegion(loc.getRegion().getRegionName(), splitPoint), (ret, err2) -> {
1577            if (err2 != null) {
1578              result.completeExceptionally(err2);
1579            } else {
1580              result.complete(ret);
1581            }
1582
1583          });
1584        }
1585      });
1586    return result;
1587  }
1588
1589  @Override
1590  public CompletableFuture<Void> splitRegion(byte[] regionName) {
1591    CompletableFuture<Void> future = new CompletableFuture<>();
1592    addListener(getRegionLocation(regionName), (location, err) -> {
1593      if (err != null) {
1594        future.completeExceptionally(err);
1595        return;
1596      }
1597      RegionInfo regionInfo = location.getRegion();
1598      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1599        future.completeExceptionally(new IllegalArgumentException("Can't split replicas directly. "
1600          + "Replicas are auto-split when their primary is split."));
1601        return;
1602      }
1603      ServerName serverName = location.getServerName();
1604      if (serverName == null) {
1605        future
1606          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1607        return;
1608      }
1609      addListener(split(regionInfo, null), (ret, err2) -> {
1610        if (err2 != null) {
1611          future.completeExceptionally(err2);
1612        } else {
1613          future.complete(ret);
1614        }
1615      });
1616    });
1617    return future;
1618  }
1619
1620  @Override
1621  public CompletableFuture<Void> splitRegion(byte[] regionName, byte[] splitPoint) {
1622    Preconditions.checkNotNull(splitPoint,
1623      "splitPoint is null. If you don't specify a splitPoint, use splitRegion(byte[]) instead");
1624    CompletableFuture<Void> future = new CompletableFuture<>();
1625    addListener(getRegionLocation(regionName), (location, err) -> {
1626      if (err != null) {
1627        future.completeExceptionally(err);
1628        return;
1629      }
1630      RegionInfo regionInfo = location.getRegion();
1631      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1632        future.completeExceptionally(new IllegalArgumentException("Can't split replicas directly. "
1633          + "Replicas are auto-split when their primary is split."));
1634        return;
1635      }
1636      ServerName serverName = location.getServerName();
1637      if (serverName == null) {
1638        future
1639          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1640        return;
1641      }
1642      if (
1643        regionInfo.getStartKey() != null
1644          && Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0
1645      ) {
1646        future.completeExceptionally(
1647          new IllegalArgumentException("should not give a splitkey which equals to startkey!"));
1648        return;
1649      }
1650      addListener(split(regionInfo, splitPoint), (ret, err2) -> {
1651        if (err2 != null) {
1652          future.completeExceptionally(err2);
1653        } else {
1654          future.complete(ret);
1655        }
1656      });
1657    });
1658    return future;
1659  }
1660
1661  private CompletableFuture<Void> split(final RegionInfo hri, byte[] splitPoint) {
1662    CompletableFuture<Void> future = new CompletableFuture<>();
1663    TableName tableName = hri.getTable();
1664    final SplitTableRegionRequest request;
1665    try {
1666      request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(),
1667        ng.newNonce());
1668    } catch (DeserializationException e) {
1669      future.completeExceptionally(e);
1670      return future;
1671    }
1672
1673    addListener(
1674      this.procedureCall(tableName, request, MasterService.Interface::splitRegion,
1675        SplitTableRegionResponse::getProcId, new SplitTableRegionProcedureBiConsumer(tableName)),
1676      (ret, err2) -> {
1677        if (err2 != null) {
1678          future.completeExceptionally(err2);
1679        } else {
1680          future.complete(ret);
1681        }
1682      });
1683    return future;
1684  }
1685
1686  @Override
1687  public CompletableFuture<Void> truncateRegion(byte[] regionName) {
1688    CompletableFuture<Void> future = new CompletableFuture<>();
1689    addListener(getRegionLocation(regionName), (location, err) -> {
1690      if (err != null) {
1691        future.completeExceptionally(err);
1692        return;
1693      }
1694      RegionInfo regionInfo = location.getRegion();
1695      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1696        future.completeExceptionally(new IllegalArgumentException(
1697          "Can't truncate replicas directly.Replicas are auto-truncated "
1698            + "when their primary is truncated."));
1699        return;
1700      }
1701      ServerName serverName = location.getServerName();
1702      if (serverName == null) {
1703        future
1704          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1705        return;
1706      }
1707      addListener(truncateRegion(regionInfo), (ret, err2) -> {
1708        if (err2 != null) {
1709          future.completeExceptionally(err2);
1710        } else {
1711          future.complete(ret);
1712        }
1713      });
1714    });
1715    return future;
1716  }
1717
1718  private CompletableFuture<Void> truncateRegion(final RegionInfo hri) {
1719    CompletableFuture<Void> future = new CompletableFuture<>();
1720    TableName tableName = hri.getTable();
1721    final MasterProtos.TruncateRegionRequest request;
1722    try {
1723      request = RequestConverter.buildTruncateRegionRequest(hri, ng.getNonceGroup(), ng.newNonce());
1724    } catch (DeserializationException e) {
1725      future.completeExceptionally(e);
1726      return future;
1727    }
1728    addListener(this.procedureCall(tableName, request, MasterService.Interface::truncateRegion,
1729      MasterProtos.TruncateRegionResponse::getProcId,
1730      new TruncateRegionProcedureBiConsumer(tableName)), (ret, err2) -> {
1731        if (err2 != null) {
1732          future.completeExceptionally(err2);
1733        } else {
1734          future.complete(ret);
1735        }
1736      });
1737    return future;
1738  }
1739
1740  @Override
1741  public CompletableFuture<Void> assign(byte[] regionName) {
1742    CompletableFuture<Void> future = new CompletableFuture<>();
1743    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1744      if (err != null) {
1745        future.completeExceptionally(err);
1746        return;
1747      }
1748      addListener(
1749        this.<Void> newMasterCaller().priority(regionInfo.getTable())
1750          .action((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call(
1751            controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()),
1752            (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null))
1753          .call(),
1754        (ret, err2) -> {
1755          if (err2 != null) {
1756            future.completeExceptionally(err2);
1757          } else {
1758            future.complete(ret);
1759          }
1760        });
1761    });
1762    return future;
1763  }
1764
1765  @Override
1766  public CompletableFuture<Void> unassign(byte[] regionName) {
1767    CompletableFuture<Void> future = new CompletableFuture<>();
1768    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1769      if (err != null) {
1770        future.completeExceptionally(err);
1771        return;
1772      }
1773      addListener(
1774        this.<Void> newMasterCaller().priority(regionInfo.getTable())
1775          .action((controller, stub) -> this.<UnassignRegionRequest, UnassignRegionResponse,
1776            Void> call(controller, stub,
1777              RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName()),
1778              (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null))
1779          .call(),
1780        (ret, err2) -> {
1781          if (err2 != null) {
1782            future.completeExceptionally(err2);
1783          } else {
1784            future.complete(ret);
1785          }
1786        });
1787    });
1788    return future;
1789  }
1790
1791  @Override
1792  public CompletableFuture<Void> offline(byte[] regionName) {
1793    CompletableFuture<Void> future = new CompletableFuture<>();
1794    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1795      if (err != null) {
1796        future.completeExceptionally(err);
1797        return;
1798      }
1799      addListener(this.<Void> newMasterCaller().priority(regionInfo.getTable())
1800        .action((controller, stub) -> this.<OfflineRegionRequest, OfflineRegionResponse, Void> call(
1801          controller, stub, RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()),
1802          (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null))
1803        .call(), (ret, err2) -> {
1804          if (err2 != null) {
1805            future.completeExceptionally(err2);
1806          } else {
1807            future.complete(ret);
1808          }
1809        });
1810    });
1811    return future;
1812  }
1813
1814  @Override
1815  public CompletableFuture<Void> move(byte[] regionName) {
1816    CompletableFuture<Void> future = new CompletableFuture<>();
1817    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1818      if (err != null) {
1819        future.completeExceptionally(err);
1820        return;
1821      }
1822      addListener(
1823        moveRegion(regionInfo,
1824          RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)),
1825        (ret, err2) -> {
1826          if (err2 != null) {
1827            future.completeExceptionally(err2);
1828          } else {
1829            future.complete(ret);
1830          }
1831        });
1832    });
1833    return future;
1834  }
1835
1836  @Override
1837  public CompletableFuture<Void> move(byte[] regionName, ServerName destServerName) {
1838    Preconditions.checkNotNull(destServerName,
1839      "destServerName is null. If you don't specify a destServerName, use move(byte[]) instead");
1840    CompletableFuture<Void> future = new CompletableFuture<>();
1841    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1842      if (err != null) {
1843        future.completeExceptionally(err);
1844        return;
1845      }
1846      addListener(
1847        moveRegion(regionInfo, RequestConverter
1848          .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)),
1849        (ret, err2) -> {
1850          if (err2 != null) {
1851            future.completeExceptionally(err2);
1852          } else {
1853            future.complete(ret);
1854          }
1855        });
1856    });
1857    return future;
1858  }
1859
1860  private CompletableFuture<Void> moveRegion(RegionInfo regionInfo, MoveRegionRequest request) {
1861    return this.<Void> newMasterCaller().priority(regionInfo.getTable())
1862      .action(
1863        (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(controller,
1864          stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null))
1865      .call();
1866  }
1867
1868  @Override
1869  public CompletableFuture<Void> setQuota(QuotaSettings quota) {
1870    return this.<Void> newMasterCaller()
1871      .action((controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(controller,
1872        stub, QuotaSettings.buildSetQuotaRequestProto(quota),
1873        (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null))
1874      .call();
1875  }
1876
1877  @Override
1878  public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) {
1879    CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>();
1880    Scan scan = QuotaTableUtil.makeScan(filter);
1881    this.connection.getTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build().scan(scan,
1882      new AdvancedScanResultConsumer() {
1883        List<QuotaSettings> settings = new ArrayList<>();
1884
1885        @Override
1886        public void onNext(Result[] results, ScanController controller) {
1887          for (Result result : results) {
1888            try {
1889              QuotaTableUtil.parseResultToCollection(result, settings);
1890            } catch (IOException e) {
1891              controller.terminate();
1892              future.completeExceptionally(e);
1893            }
1894          }
1895        }
1896
1897        @Override
1898        public void onError(Throwable error) {
1899          future.completeExceptionally(error);
1900        }
1901
1902        @Override
1903        public void onComplete() {
1904          future.complete(settings);
1905        }
1906      });
1907    return future;
1908  }
1909
1910  @Override
1911  public CompletableFuture<Void> addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig,
1912    boolean enabled) {
1913    return this.<AddReplicationPeerRequest, AddReplicationPeerResponse> procedureCall(
1914      RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled),
1915      (s, c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1916      new ReplicationProcedureBiConsumer(peerId, () -> "ADD_REPLICATION_PEER"));
1917  }
1918
1919  @Override
1920  public CompletableFuture<Void> removeReplicationPeer(String peerId) {
1921    return this.<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse> procedureCall(
1922      RequestConverter.buildRemoveReplicationPeerRequest(peerId),
1923      (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1924      new ReplicationProcedureBiConsumer(peerId, () -> "REMOVE_REPLICATION_PEER"));
1925  }
1926
1927  @Override
1928  public CompletableFuture<Void> enableReplicationPeer(String peerId) {
1929    return this.<EnableReplicationPeerRequest, EnableReplicationPeerResponse> procedureCall(
1930      RequestConverter.buildEnableReplicationPeerRequest(peerId),
1931      (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1932      new ReplicationProcedureBiConsumer(peerId, () -> "ENABLE_REPLICATION_PEER"));
1933  }
1934
1935  @Override
1936  public CompletableFuture<Void> disableReplicationPeer(String peerId) {
1937    return this.<DisableReplicationPeerRequest, DisableReplicationPeerResponse> procedureCall(
1938      RequestConverter.buildDisableReplicationPeerRequest(peerId),
1939      (s, c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1940      new ReplicationProcedureBiConsumer(peerId, () -> "DISABLE_REPLICATION_PEER"));
1941  }
1942
1943  @Override
1944  public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) {
1945    return this.<ReplicationPeerConfig> newMasterCaller()
1946      .action((controller, stub) -> this.<GetReplicationPeerConfigRequest,
1947        GetReplicationPeerConfigResponse, ReplicationPeerConfig> call(controller, stub,
1948          RequestConverter.buildGetReplicationPeerConfigRequest(peerId),
1949          (s, c, req, done) -> s.getReplicationPeerConfig(c, req, done),
1950          (resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig())))
1951      .call();
1952  }
1953
1954  @Override
1955  public CompletableFuture<Void> updateReplicationPeerConfig(String peerId,
1956    ReplicationPeerConfig peerConfig) {
1957    return this.<UpdateReplicationPeerConfigRequest,
1958      UpdateReplicationPeerConfigResponse> procedureCall(
1959        RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig),
1960        (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done),
1961        (resp) -> resp.getProcId(),
1962        new ReplicationProcedureBiConsumer(peerId, () -> "UPDATE_REPLICATION_PEER_CONFIG"));
1963  }
1964
1965  @Override
1966  public CompletableFuture<Void> transitReplicationPeerSyncReplicationState(String peerId,
1967    SyncReplicationState clusterState) {
1968    return this.<TransitReplicationPeerSyncReplicationStateRequest,
1969      TransitReplicationPeerSyncReplicationStateResponse> procedureCall(
1970        RequestConverter.buildTransitReplicationPeerSyncReplicationStateRequest(peerId,
1971          clusterState),
1972        (s, c, req, done) -> s.transitReplicationPeerSyncReplicationState(c, req, done),
1973        (resp) -> resp.getProcId(), new ReplicationProcedureBiConsumer(peerId,
1974          () -> "TRANSIT_REPLICATION_PEER_SYNCHRONOUS_REPLICATION_STATE"));
1975  }
1976
1977  @Override
1978  public CompletableFuture<Void> appendReplicationPeerTableCFs(String id,
1979    Map<TableName, List<String>> tableCfs) {
1980    if (tableCfs == null) {
1981      return failedFuture(new ReplicationException("tableCfs is null"));
1982    }
1983
1984    CompletableFuture<Void> future = new CompletableFuture<>();
1985    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
1986      if (!completeExceptionally(future, error)) {
1987        ReplicationPeerConfig newPeerConfig =
1988          ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
1989        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
1990          if (!completeExceptionally(future, error)) {
1991            future.complete(result);
1992          }
1993        });
1994      }
1995    });
1996    return future;
1997  }
1998
1999  @Override
2000  public CompletableFuture<Void> removeReplicationPeerTableCFs(String id,
2001    Map<TableName, List<String>> tableCfs) {
2002    if (tableCfs == null) {
2003      return failedFuture(new ReplicationException("tableCfs is null"));
2004    }
2005
2006    CompletableFuture<Void> future = new CompletableFuture<>();
2007    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
2008      if (!completeExceptionally(future, error)) {
2009        ReplicationPeerConfig newPeerConfig = null;
2010        try {
2011          newPeerConfig = ReplicationPeerConfigUtil
2012            .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
2013        } catch (ReplicationException e) {
2014          future.completeExceptionally(e);
2015          return;
2016        }
2017        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
2018          if (!completeExceptionally(future, error)) {
2019            future.complete(result);
2020          }
2021        });
2022      }
2023    });
2024    return future;
2025  }
2026
2027  @Override
2028  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers() {
2029    return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(null));
2030  }
2031
2032  @Override
2033  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Pattern pattern) {
2034    Preconditions.checkNotNull(pattern,
2035      "pattern is null. If you don't specify a pattern, use listReplicationPeers() instead");
2036    return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(pattern));
2037  }
2038
2039  private CompletableFuture<List<ReplicationPeerDescription>>
2040    listReplicationPeers(ListReplicationPeersRequest request) {
2041    return this.<List<ReplicationPeerDescription>> newMasterCaller()
2042      .action((controller, stub) -> this.<ListReplicationPeersRequest, ListReplicationPeersResponse,
2043        List<ReplicationPeerDescription>> call(controller, stub, request,
2044          (s, c, req, done) -> s.listReplicationPeers(c, req, done),
2045          (resp) -> resp.getPeerDescList().stream()
2046            .map(ReplicationPeerConfigUtil::toReplicationPeerDescription)
2047            .collect(Collectors.toList())))
2048      .call();
2049  }
2050
2051  @Override
2052  public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() {
2053    CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>();
2054    addListener(listTableDescriptors(), (tables, error) -> {
2055      if (!completeExceptionally(future, error)) {
2056        List<TableCFs> replicatedTableCFs = new ArrayList<>();
2057        tables.forEach(table -> {
2058          Map<String, Integer> cfs = new HashMap<>();
2059          Stream.of(table.getColumnFamilies())
2060            .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL)
2061            .forEach(column -> {
2062              cfs.put(column.getNameAsString(), column.getScope());
2063            });
2064          if (!cfs.isEmpty()) {
2065            replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs));
2066          }
2067        });
2068        future.complete(replicatedTableCFs);
2069      }
2070    });
2071    return future;
2072  }
2073
2074  @Override
2075  public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) {
2076    SnapshotProtos.SnapshotDescription snapshot =
2077      ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc);
2078    try {
2079      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
2080    } catch (IllegalArgumentException e) {
2081      return failedFuture(e);
2082    }
2083    CompletableFuture<Void> future = new CompletableFuture<>();
2084    final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot)
2085      .setNonceGroup(ng.getNonceGroup()).setNonce(ng.newNonce()).build();
2086    addListener(this.<SnapshotResponse> newMasterCaller()
2087      .action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, SnapshotResponse> call(
2088        controller, stub, request, (s, c, req, done) -> s.snapshot(c, req, done), resp -> resp))
2089      .call(), (resp, err) -> {
2090        if (err != null) {
2091          future.completeExceptionally(err);
2092          return;
2093        }
2094        waitSnapshotFinish(snapshotDesc, future, resp);
2095      });
2096    return future;
2097  }
2098
2099  // This is for keeping compatibility with old implementation.
2100  // If there is a procId field in the response, then the snapshot will be operated with a
2101  // SnapshotProcedure, otherwise the snapshot will be coordinated by zk.
2102  private void waitSnapshotFinish(SnapshotDescription snapshot, CompletableFuture<Void> future,
2103    SnapshotResponse resp) {
2104    if (resp.hasProcId()) {
2105      getProcedureResult(resp.getProcId(), src -> null, future, 0);
2106      addListener(future, new SnapshotProcedureBiConsumer(snapshot.getTableName()));
2107    } else {
2108      long expectedTimeout = resp.getExpectedTimeout();
2109      TimerTask pollingTask = new TimerTask() {
2110        int tries = 0;
2111        long startTime = EnvironmentEdgeManager.currentTime();
2112        long endTime = startTime + expectedTimeout;
2113        long maxPauseTime = expectedTimeout / maxAttempts;
2114
2115        @Override
2116        public void run(Timeout timeout) throws Exception {
2117          if (EnvironmentEdgeManager.currentTime() < endTime) {
2118            addListener(isSnapshotFinished(snapshot), (done, err2) -> {
2119              if (err2 != null) {
2120                future.completeExceptionally(err2);
2121              } else if (done) {
2122                future.complete(null);
2123              } else {
2124                // retry again after pauseTime.
2125                long pauseTime =
2126                  ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
2127                pauseTime = Math.min(pauseTime, maxPauseTime);
2128                AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, TimeUnit.MILLISECONDS);
2129              }
2130            });
2131          } else {
2132            future
2133              .completeExceptionally(new SnapshotCreationException("Snapshot '" + snapshot.getName()
2134                + "' wasn't completed in expectedTime:" + expectedTimeout + " ms", snapshot));
2135          }
2136        }
2137      };
2138      AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
2139    }
2140  }
2141
2142  @Override
2143  public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) {
2144    return this.<Boolean> newMasterCaller()
2145      .action((controller, stub) -> this.<IsSnapshotDoneRequest, IsSnapshotDoneResponse,
2146        Boolean> call(controller, stub,
2147          IsSnapshotDoneRequest.newBuilder()
2148            .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(),
2149          (s, c, req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone()))
2150      .call();
2151  }
2152
2153  @Override
2154  public CompletableFuture<Void> restoreSnapshot(String snapshotName) {
2155    boolean takeFailSafeSnapshot = this.connection.getConfiguration().getBoolean(
2156      HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT,
2157      HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT);
2158    return restoreSnapshot(snapshotName, takeFailSafeSnapshot);
2159  }
2160
2161  @Override
2162  public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot,
2163    boolean restoreAcl) {
2164    CompletableFuture<Void> future = new CompletableFuture<>();
2165    addListener(listSnapshots(Pattern.compile(snapshotName)), (snapshotDescriptions, err) -> {
2166      if (err != null) {
2167        future.completeExceptionally(err);
2168        return;
2169      }
2170      TableName tableName = null;
2171      if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) {
2172        for (SnapshotDescription snap : snapshotDescriptions) {
2173          if (snap.getName().equals(snapshotName)) {
2174            tableName = snap.getTableName();
2175            break;
2176          }
2177        }
2178      }
2179      if (tableName == null) {
2180        future.completeExceptionally(
2181          new RestoreSnapshotException("The snapshot " + snapshotName + " does not exist."));
2182        return;
2183      }
2184      final TableName finalTableName = tableName;
2185      addListener(tableExists(finalTableName), (exists, err2) -> {
2186        if (err2 != null) {
2187          future.completeExceptionally(err2);
2188        } else if (!exists) {
2189          // if table does not exist, then just clone snapshot into new table.
2190          completeConditionalOnFuture(future,
2191            internalRestoreSnapshot(snapshotName, finalTableName, restoreAcl, null));
2192        } else {
2193          addListener(isTableDisabled(finalTableName), (disabled, err4) -> {
2194            if (err4 != null) {
2195              future.completeExceptionally(err4);
2196            } else if (!disabled) {
2197              future.completeExceptionally(new TableNotDisabledException(finalTableName));
2198            } else {
2199              completeConditionalOnFuture(future,
2200                restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot, restoreAcl));
2201            }
2202          });
2203        }
2204      });
2205    });
2206    return future;
2207  }
2208
2209  private CompletableFuture<Void> restoreSnapshot(String snapshotName, TableName tableName,
2210    boolean takeFailSafeSnapshot, boolean restoreAcl) {
2211    if (takeFailSafeSnapshot) {
2212      CompletableFuture<Void> future = new CompletableFuture<>();
2213      // Step.1 Take a snapshot of the current state
2214      String failSafeSnapshotSnapshotNameFormat =
2215        this.connection.getConfiguration().get(HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME,
2216          HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME);
2217      final String failSafeSnapshotSnapshotName =
2218        failSafeSnapshotSnapshotNameFormat.replace("{snapshot.name}", snapshotName)
2219          .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.'))
2220          .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime()));
2221      LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2222      addListener(snapshot(failSafeSnapshotSnapshotName, tableName), (ret, err) -> {
2223        if (err != null) {
2224          future.completeExceptionally(err);
2225        } else {
2226          // Step.2 Restore snapshot
2227          addListener(internalRestoreSnapshot(snapshotName, tableName, restoreAcl, null),
2228            (void2, err2) -> {
2229              if (err2 != null) {
2230                // Step.3.a Something went wrong during the restore and try to rollback.
2231                addListener(internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName,
2232                  restoreAcl, null), (void3, err3) -> {
2233                    if (err3 != null) {
2234                      future.completeExceptionally(err3);
2235                    } else {
2236                      // If fail to restore snapshot but rollback successfully, delete the
2237                      // restore-failsafe snapshot.
2238                      LOG.info(
2239                        "Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2240                      addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret4, err4) -> {
2241                        if (err4 != null) {
2242                          LOG.error("Unable to remove the failsafe snapshot: {}",
2243                            failSafeSnapshotSnapshotName, err4);
2244                        }
2245                        String msg =
2246                          "Restore snapshot=" + snapshotName + " failed, Rollback to snapshot="
2247                            + failSafeSnapshotSnapshotName + " succeeded.";
2248                        LOG.error(msg);
2249                        future.completeExceptionally(new RestoreSnapshotException(msg, err2));
2250                      });
2251                    }
2252                  });
2253              } else {
2254                // Step.3.b If the restore is succeeded, delete the pre-restore snapshot.
2255                LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2256                addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret3, err3) -> {
2257                  if (err3 != null) {
2258                    LOG.error(
2259                      "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName,
2260                      err3);
2261                  }
2262                  future.complete(ret3);
2263                });
2264              }
2265            });
2266        }
2267      });
2268      return future;
2269    } else {
2270      return internalRestoreSnapshot(snapshotName, tableName, restoreAcl, null);
2271    }
2272  }
2273
2274  private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture,
2275    CompletableFuture<T> parentFuture) {
2276    addListener(parentFuture, (res, err) -> {
2277      if (err != null) {
2278        dependentFuture.completeExceptionally(err);
2279      } else {
2280        dependentFuture.complete(res);
2281      }
2282    });
2283  }
2284
2285  @Override
2286  public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName,
2287    boolean restoreAcl, String customSFT) {
2288    CompletableFuture<Void> future = new CompletableFuture<>();
2289    addListener(tableExists(tableName), (exists, err) -> {
2290      if (err != null) {
2291        future.completeExceptionally(err);
2292      } else if (exists) {
2293        future.completeExceptionally(new TableExistsException(tableName));
2294      } else {
2295        completeConditionalOnFuture(future,
2296          internalRestoreSnapshot(snapshotName, tableName, restoreAcl, customSFT));
2297      }
2298    });
2299    return future;
2300  }
2301
2302  private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, TableName tableName,
2303    boolean restoreAcl, String customSFT) {
2304    SnapshotProtos.SnapshotDescription snapshot = SnapshotProtos.SnapshotDescription.newBuilder()
2305      .setName(snapshotName).setTable(tableName.getNameAsString()).build();
2306    try {
2307      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
2308    } catch (IllegalArgumentException e) {
2309      return failedFuture(e);
2310    }
2311    RestoreSnapshotRequest.Builder builder =
2312      RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot).setNonceGroup(ng.getNonceGroup())
2313        .setNonce(ng.newNonce()).setRestoreACL(restoreAcl);
2314    if (customSFT != null) {
2315      builder.setCustomSFT(customSFT);
2316    }
2317    return waitProcedureResult(this.<Long> newMasterCaller()
2318      .action((controller, stub) -> this.<RestoreSnapshotRequest, RestoreSnapshotResponse,
2319        Long> call(controller, stub, builder.build(),
2320          (s, c, req, done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId()))
2321      .call(), result -> null);
2322  }
2323
2324  @Override
2325  public CompletableFuture<List<SnapshotDescription>> listSnapshots() {
2326    return getCompletedSnapshots(null);
2327  }
2328
2329  @Override
2330  public CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern) {
2331    Preconditions.checkNotNull(pattern,
2332      "pattern is null. If you don't specify a pattern, use listSnapshots() instead");
2333    return getCompletedSnapshots(pattern);
2334  }
2335
2336  private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(Pattern pattern) {
2337    return this.<List<SnapshotDescription>> newMasterCaller()
2338      .action((controller, stub) -> this.<GetCompletedSnapshotsRequest,
2339        GetCompletedSnapshotsResponse, List<SnapshotDescription>> call(controller, stub,
2340          GetCompletedSnapshotsRequest.newBuilder().build(),
2341          (s, c, req, done) -> s.getCompletedSnapshots(c, req, done),
2342          resp -> ProtobufUtil.toSnapshotDescriptionList(resp, pattern)))
2343      .call();
2344  }
2345
2346  @Override
2347  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern) {
2348    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2349      + " If you don't specify a tableNamePattern, use listSnapshots() instead");
2350    return getCompletedSnapshots(tableNamePattern, null);
2351  }
2352
2353  @Override
2354  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern,
2355    Pattern snapshotNamePattern) {
2356    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2357      + " If you don't specify a tableNamePattern, use listSnapshots(Pattern) instead");
2358    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2359      + " If you don't specify a snapshotNamePattern, use listTableSnapshots(Pattern) instead");
2360    return getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2361  }
2362
2363  private CompletableFuture<List<SnapshotDescription>>
2364    getCompletedSnapshots(Pattern tableNamePattern, Pattern snapshotNamePattern) {
2365    CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>();
2366    addListener(listTableNames(tableNamePattern, false), (tableNames, err) -> {
2367      if (err != null) {
2368        future.completeExceptionally(err);
2369        return;
2370      }
2371      if (tableNames == null || tableNames.size() <= 0) {
2372        future.complete(Collections.emptyList());
2373        return;
2374      }
2375      addListener(getCompletedSnapshots(snapshotNamePattern), (snapshotDescList, err2) -> {
2376        if (err2 != null) {
2377          future.completeExceptionally(err2);
2378          return;
2379        }
2380        if (snapshotDescList == null || snapshotDescList.isEmpty()) {
2381          future.complete(Collections.emptyList());
2382          return;
2383        }
2384        future.complete(snapshotDescList.stream()
2385          .filter(snap -> (snap != null && tableNames.contains(snap.getTableName())))
2386          .collect(Collectors.toList()));
2387      });
2388    });
2389    return future;
2390  }
2391
2392  @Override
2393  public CompletableFuture<Void> deleteSnapshot(String snapshotName) {
2394    return internalDeleteSnapshot(new SnapshotDescription(snapshotName));
2395  }
2396
2397  @Override
2398  public CompletableFuture<Void> deleteSnapshots() {
2399    return internalDeleteSnapshots(null, null);
2400  }
2401
2402  @Override
2403  public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) {
2404    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2405      + " If you don't specify a snapshotNamePattern, use deleteSnapshots() instead");
2406    return internalDeleteSnapshots(null, snapshotNamePattern);
2407  }
2408
2409  @Override
2410  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern) {
2411    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2412      + " If you don't specify a tableNamePattern, use deleteSnapshots() instead");
2413    return internalDeleteSnapshots(tableNamePattern, null);
2414  }
2415
2416  @Override
2417  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern,
2418    Pattern snapshotNamePattern) {
2419    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2420      + " If you don't specify a tableNamePattern, use deleteSnapshots(Pattern) instead");
2421    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2422      + " If you don't specify a snapshotNamePattern, use deleteSnapshots(Pattern) instead");
2423    return internalDeleteSnapshots(tableNamePattern, snapshotNamePattern);
2424  }
2425
2426  private CompletableFuture<Void> internalDeleteSnapshots(Pattern tableNamePattern,
2427    Pattern snapshotNamePattern) {
2428    CompletableFuture<List<SnapshotDescription>> listSnapshotsFuture;
2429    if (tableNamePattern == null) {
2430      listSnapshotsFuture = getCompletedSnapshots(snapshotNamePattern);
2431    } else {
2432      listSnapshotsFuture = getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2433    }
2434    CompletableFuture<Void> future = new CompletableFuture<>();
2435    addListener(listSnapshotsFuture, (snapshotDescriptions, err) -> {
2436      if (err != null) {
2437        future.completeExceptionally(err);
2438        return;
2439      }
2440      if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) {
2441        future.complete(null);
2442        return;
2443      }
2444      addListener(CompletableFuture.allOf(snapshotDescriptions.stream()
2445        .map(this::internalDeleteSnapshot).toArray(CompletableFuture[]::new)), (v, e) -> {
2446          if (e != null) {
2447            future.completeExceptionally(e);
2448          } else {
2449            future.complete(v);
2450          }
2451        });
2452    });
2453    return future;
2454  }
2455
2456  private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) {
2457    return this.<Void> newMasterCaller()
2458      .action((controller, stub) -> this.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call(
2459        controller, stub,
2460        DeleteSnapshotRequest.newBuilder()
2461          .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(),
2462        (s, c, req, done) -> s.deleteSnapshot(c, req, done), resp -> null))
2463      .call();
2464  }
2465
2466  @Override
2467  public CompletableFuture<Void> execProcedure(String signature, String instance,
2468    Map<String, String> props) {
2469    CompletableFuture<Void> future = new CompletableFuture<>();
2470    ProcedureDescription procDesc =
2471      ProtobufUtil.buildProcedureDescription(signature, instance, props);
2472    addListener(this.<Long> newMasterCaller()
2473      .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call(
2474        controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(),
2475        (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout()))
2476      .call(), (expectedTimeout, err) -> {
2477        if (err != null) {
2478          future.completeExceptionally(err);
2479          return;
2480        }
2481        TimerTask pollingTask = new TimerTask() {
2482          int tries = 0;
2483          long startTime = EnvironmentEdgeManager.currentTime();
2484          long endTime = startTime + expectedTimeout;
2485          long maxPauseTime = expectedTimeout / maxAttempts;
2486
2487          @Override
2488          public void run(Timeout timeout) throws Exception {
2489            if (EnvironmentEdgeManager.currentTime() < endTime) {
2490              addListener(isProcedureFinished(signature, instance, props), (done, err2) -> {
2491                if (err2 != null) {
2492                  future.completeExceptionally(err2);
2493                  return;
2494                }
2495                if (done) {
2496                  future.complete(null);
2497                } else {
2498                  // retry again after pauseTime.
2499                  long pauseTime =
2500                    ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
2501                  pauseTime = Math.min(pauseTime, maxPauseTime);
2502                  AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
2503                    TimeUnit.MICROSECONDS);
2504                }
2505              });
2506            } else {
2507              future.completeExceptionally(new IOException("Procedure '" + signature + " : "
2508                + instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms"));
2509            }
2510          }
2511        };
2512        // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously.
2513        AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
2514      });
2515    return future;
2516  }
2517
2518  @Override
2519  public CompletableFuture<byte[]> execProcedureWithReturn(String signature, String instance,
2520    Map<String, String> props) {
2521    ProcedureDescription proDesc =
2522      ProtobufUtil.buildProcedureDescription(signature, instance, props);
2523    return this.<byte[]> newMasterCaller()
2524      .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, byte[]> call(
2525        controller, stub, ExecProcedureRequest.newBuilder().setProcedure(proDesc).build(),
2526        (s, c, req, done) -> s.execProcedureWithRet(c, req, done),
2527        resp -> resp.hasReturnData() ? resp.getReturnData().toByteArray() : null))
2528      .call();
2529  }
2530
2531  @Override
2532  public CompletableFuture<Boolean> isProcedureFinished(String signature, String instance,
2533    Map<String, String> props) {
2534    ProcedureDescription proDesc =
2535      ProtobufUtil.buildProcedureDescription(signature, instance, props);
2536    return this.<Boolean> newMasterCaller()
2537      .action(
2538        (controller, stub) -> this.<IsProcedureDoneRequest, IsProcedureDoneResponse, Boolean> call(
2539          controller, stub, IsProcedureDoneRequest.newBuilder().setProcedure(proDesc).build(),
2540          (s, c, req, done) -> s.isProcedureDone(c, req, done), resp -> resp.getDone()))
2541      .call();
2542  }
2543
2544  @Override
2545  public CompletableFuture<Boolean> abortProcedure(long procId, boolean mayInterruptIfRunning) {
2546    return this.<Boolean> newMasterCaller().action(
2547      (controller, stub) -> this.<AbortProcedureRequest, AbortProcedureResponse, Boolean> call(
2548        controller, stub, AbortProcedureRequest.newBuilder().setProcId(procId).build(),
2549        (s, c, req, done) -> s.abortProcedure(c, req, done), resp -> resp.getIsProcedureAborted()))
2550      .call();
2551  }
2552
2553  @Override
2554  public CompletableFuture<String> getProcedures() {
2555    return this.<String> newMasterCaller()
2556      .action((controller, stub) -> this.<GetProceduresRequest, GetProceduresResponse, String> call(
2557        controller, stub, GetProceduresRequest.newBuilder().build(),
2558        (s, c, req, done) -> s.getProcedures(c, req, done),
2559        resp -> ProtobufUtil.toProcedureJson(resp.getProcedureList())))
2560      .call();
2561  }
2562
2563  @Override
2564  public CompletableFuture<String> getLocks() {
2565    return this.<String> newMasterCaller()
2566      .action(
2567        (controller, stub) -> this.<GetLocksRequest, GetLocksResponse, String> call(controller,
2568          stub, GetLocksRequest.newBuilder().build(), (s, c, req, done) -> s.getLocks(c, req, done),
2569          resp -> ProtobufUtil.toLockJson(resp.getLockList())))
2570      .call();
2571  }
2572
2573  @Override
2574  public CompletableFuture<Void> decommissionRegionServers(List<ServerName> servers,
2575    boolean offload) {
2576    return this.<Void> newMasterCaller()
2577      .action((controller, stub) -> this.<DecommissionRegionServersRequest,
2578        DecommissionRegionServersResponse, Void> call(controller, stub,
2579          RequestConverter.buildDecommissionRegionServersRequest(servers, offload),
2580          (s, c, req, done) -> s.decommissionRegionServers(c, req, done), resp -> null))
2581      .call();
2582  }
2583
2584  @Override
2585  public CompletableFuture<List<ServerName>> listDecommissionedRegionServers() {
2586    return this.<List<ServerName>> newMasterCaller()
2587      .action((controller, stub) -> this.<ListDecommissionedRegionServersRequest,
2588        ListDecommissionedRegionServersResponse, List<ServerName>> call(controller, stub,
2589          ListDecommissionedRegionServersRequest.newBuilder().build(),
2590          (s, c, req, done) -> s.listDecommissionedRegionServers(c, req, done),
2591          resp -> resp.getServerNameList().stream().map(ProtobufUtil::toServerName)
2592            .collect(Collectors.toList())))
2593      .call();
2594  }
2595
2596  @Override
2597  public CompletableFuture<Void> recommissionRegionServer(ServerName server,
2598    List<byte[]> encodedRegionNames) {
2599    return this.<Void> newMasterCaller()
2600      .action((controller, stub) -> this.<RecommissionRegionServerRequest,
2601        RecommissionRegionServerResponse, Void> call(controller, stub,
2602          RequestConverter.buildRecommissionRegionServerRequest(server, encodedRegionNames),
2603          (s, c, req, done) -> s.recommissionRegionServer(c, req, done), resp -> null))
2604      .call();
2605  }
2606
2607  /**
2608   * Get the region location for the passed region name. The region name may be a full region name
2609   * or encoded region name. If the region does not found, then it'll throw an
2610   * UnknownRegionException wrapped by a {@link CompletableFuture}
2611   * @param regionNameOrEncodedRegionName region name or encoded region name
2612   * @return region location, wrapped by a {@link CompletableFuture}
2613   */
2614  CompletableFuture<HRegionLocation> getRegionLocation(byte[] regionNameOrEncodedRegionName) {
2615    if (regionNameOrEncodedRegionName == null) {
2616      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2617    }
2618
2619    CompletableFuture<Optional<HRegionLocation>> future;
2620    if (RegionInfo.isEncodedRegionName(regionNameOrEncodedRegionName)) {
2621      String encodedName = Bytes.toString(regionNameOrEncodedRegionName);
2622      if (encodedName.length() < RegionInfo.MD5_HEX_LENGTH) {
2623        // old format encodedName, should be meta region
2624        future = connection.registry.getMetaRegionLocations()
2625          .thenApply(locs -> Stream.of(locs.getRegionLocations())
2626            .filter(loc -> loc.getRegion().getEncodedName().equals(encodedName)).findFirst());
2627      } else {
2628        future = ClientMetaTableAccessor.getRegionLocationWithEncodedName(metaTable,
2629          regionNameOrEncodedRegionName);
2630      }
2631    } else {
2632      // Not all regionNameOrEncodedRegionName here is going to be a valid region name,
2633      // it needs to throw out IllegalArgumentException in case tableName is passed in.
2634      RegionInfo regionInfo;
2635      try {
2636        regionInfo =
2637          CatalogFamilyFormat.parseRegionInfoFromRegionName(regionNameOrEncodedRegionName);
2638      } catch (IOException ioe) {
2639        return failedFuture(new IllegalArgumentException(ioe.getMessage()));
2640      }
2641
2642      if (regionInfo.isMetaRegion()) {
2643        future = connection.registry.getMetaRegionLocations()
2644          .thenApply(locs -> Stream.of(locs.getRegionLocations())
2645            .filter(loc -> loc.getRegion().getReplicaId() == regionInfo.getReplicaId())
2646            .findFirst());
2647      } else {
2648        future =
2649          ClientMetaTableAccessor.getRegionLocation(metaTable, regionNameOrEncodedRegionName);
2650      }
2651    }
2652
2653    CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>();
2654    addListener(future, (location, err) -> {
2655      if (err != null) {
2656        returnedFuture.completeExceptionally(err);
2657        return;
2658      }
2659      if (!location.isPresent() || location.get().getRegion() == null) {
2660        returnedFuture.completeExceptionally(
2661          new UnknownRegionException("Invalid region name or encoded region name: "
2662            + Bytes.toStringBinary(regionNameOrEncodedRegionName)));
2663      } else {
2664        returnedFuture.complete(location.get());
2665      }
2666    });
2667    return returnedFuture;
2668  }
2669
2670  /**
2671   * Get the region info for the passed region name. The region name may be a full region name or
2672   * encoded region name. If the region does not found, then it'll throw an UnknownRegionException
2673   * wrapped by a {@link CompletableFuture}
2674   * @return region info, wrapped by a {@link CompletableFuture}
2675   */
2676  private CompletableFuture<RegionInfo> getRegionInfo(byte[] regionNameOrEncodedRegionName) {
2677    if (regionNameOrEncodedRegionName == null) {
2678      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2679    }
2680
2681    if (
2682      Bytes.equals(regionNameOrEncodedRegionName,
2683        RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName())
2684        || Bytes.equals(regionNameOrEncodedRegionName,
2685          RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes())
2686    ) {
2687      return CompletableFuture.completedFuture(RegionInfoBuilder.FIRST_META_REGIONINFO);
2688    }
2689
2690    CompletableFuture<RegionInfo> future = new CompletableFuture<>();
2691    addListener(getRegionLocation(regionNameOrEncodedRegionName), (location, err) -> {
2692      if (err != null) {
2693        future.completeExceptionally(err);
2694      } else {
2695        future.complete(location.getRegion());
2696      }
2697    });
2698    return future;
2699  }
2700
2701  private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) {
2702    if (numRegions < 3) {
2703      throw new IllegalArgumentException("Must create at least three regions");
2704    } else if (Bytes.compareTo(startKey, endKey) >= 0) {
2705      throw new IllegalArgumentException("Start key must be smaller than end key");
2706    }
2707    if (numRegions == 3) {
2708      return new byte[][] { startKey, endKey };
2709    }
2710    byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
2711    if (splitKeys == null || splitKeys.length != numRegions - 1) {
2712      throw new IllegalArgumentException("Unable to split key range into enough regions");
2713    }
2714    return splitKeys;
2715  }
2716
2717  private void verifySplitKeys(byte[][] splitKeys) {
2718    Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR);
2719    // Verify there are no duplicate split keys
2720    byte[] lastKey = null;
2721    for (byte[] splitKey : splitKeys) {
2722      if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) {
2723        throw new IllegalArgumentException("Empty split key must not be passed in the split keys.");
2724      }
2725      if (lastKey != null && Bytes.equals(splitKey, lastKey)) {
2726        throw new IllegalArgumentException("All split keys must be unique, " + "found duplicate: "
2727          + Bytes.toStringBinary(splitKey) + ", " + Bytes.toStringBinary(lastKey));
2728      }
2729      lastKey = splitKey;
2730    }
2731  }
2732
2733  private static abstract class ProcedureBiConsumer<T> implements BiConsumer<T, Throwable> {
2734
2735    abstract void onFinished();
2736
2737    abstract void onError(Throwable error);
2738
2739    @Override
2740    public void accept(T value, Throwable error) {
2741      if (error != null) {
2742        onError(error);
2743        return;
2744      }
2745      onFinished();
2746    }
2747  }
2748
2749  private static abstract class TableProcedureBiConsumer extends ProcedureBiConsumer<Void> {
2750    protected final TableName tableName;
2751
2752    TableProcedureBiConsumer(TableName tableName) {
2753      this.tableName = tableName;
2754    }
2755
2756    abstract String getOperationType();
2757
2758    String getDescription() {
2759      return "Operation: " + getOperationType() + ", " + "Table Name: "
2760        + tableName.getNameWithNamespaceInclAsString();
2761    }
2762
2763    @Override
2764    void onFinished() {
2765      LOG.info(getDescription() + " completed");
2766    }
2767
2768    @Override
2769    void onError(Throwable error) {
2770      LOG.info(getDescription() + " failed with " + error.getMessage());
2771    }
2772  }
2773
2774  private static abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer<Void> {
2775    protected final String namespaceName;
2776
2777    NamespaceProcedureBiConsumer(String namespaceName) {
2778      this.namespaceName = namespaceName;
2779    }
2780
2781    abstract String getOperationType();
2782
2783    String getDescription() {
2784      return "Operation: " + getOperationType() + ", Namespace: " + namespaceName;
2785    }
2786
2787    @Override
2788    void onFinished() {
2789      LOG.info("{} completed", getDescription());
2790    }
2791
2792    @Override
2793    void onError(Throwable error) {
2794      LOG.info("{} failed with {}", getDescription(), error.getMessage());
2795    }
2796  }
2797
2798  private static class RestoreBackupSystemTableProcedureBiConsumer extends ProcedureBiConsumer {
2799
2800    @Override
2801    void onFinished() {
2802      LOG.info("RestoreBackupSystemTableProcedure completed");
2803    }
2804
2805    @Override
2806    void onError(Throwable error) {
2807      LOG.info("RestoreBackupSystemTableProcedure failed with {}", error.getMessage());
2808    }
2809  }
2810
2811  private static class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2812
2813    CreateTableProcedureBiConsumer(TableName tableName) {
2814      super(tableName);
2815    }
2816
2817    @Override
2818    String getOperationType() {
2819      return "CREATE";
2820    }
2821  }
2822
2823  private static class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer {
2824
2825    ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
2826      super(tableName);
2827    }
2828
2829    @Override
2830    String getOperationType() {
2831      return "MODIFY";
2832    }
2833  }
2834
2835  private static class ModifyTableStoreFileTrackerProcedureBiConsumer
2836    extends TableProcedureBiConsumer {
2837
2838    ModifyTableStoreFileTrackerProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
2839      super(tableName);
2840    }
2841
2842    @Override
2843    String getOperationType() {
2844      return "MODIFY_TABLE_STORE_FILE_TRACKER";
2845    }
2846  }
2847
2848  private class DeleteTableProcedureBiConsumer extends TableProcedureBiConsumer {
2849
2850    DeleteTableProcedureBiConsumer(TableName tableName) {
2851      super(tableName);
2852    }
2853
2854    @Override
2855    String getOperationType() {
2856      return "DELETE";
2857    }
2858
2859    @Override
2860    void onFinished() {
2861      connection.getLocator().clearCache(this.tableName);
2862      super.onFinished();
2863    }
2864  }
2865
2866  private static class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2867
2868    TruncateTableProcedureBiConsumer(TableName tableName) {
2869      super(tableName);
2870    }
2871
2872    @Override
2873    String getOperationType() {
2874      return "TRUNCATE";
2875    }
2876  }
2877
2878  private static class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2879
2880    EnableTableProcedureBiConsumer(TableName tableName) {
2881      super(tableName);
2882    }
2883
2884    @Override
2885    String getOperationType() {
2886      return "ENABLE";
2887    }
2888  }
2889
2890  private static class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2891
2892    DisableTableProcedureBiConsumer(TableName tableName) {
2893      super(tableName);
2894    }
2895
2896    @Override
2897    String getOperationType() {
2898      return "DISABLE";
2899    }
2900  }
2901
2902  private static class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2903
2904    AddColumnFamilyProcedureBiConsumer(TableName tableName) {
2905      super(tableName);
2906    }
2907
2908    @Override
2909    String getOperationType() {
2910      return "ADD_COLUMN_FAMILY";
2911    }
2912  }
2913
2914  private static class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2915
2916    DeleteColumnFamilyProcedureBiConsumer(TableName tableName) {
2917      super(tableName);
2918    }
2919
2920    @Override
2921    String getOperationType() {
2922      return "DELETE_COLUMN_FAMILY";
2923    }
2924  }
2925
2926  private static class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2927
2928    ModifyColumnFamilyProcedureBiConsumer(TableName tableName) {
2929      super(tableName);
2930    }
2931
2932    @Override
2933    String getOperationType() {
2934      return "MODIFY_COLUMN_FAMILY";
2935    }
2936  }
2937
2938  private static class ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer
2939    extends TableProcedureBiConsumer {
2940
2941    ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer(TableName tableName) {
2942      super(tableName);
2943    }
2944
2945    @Override
2946    String getOperationType() {
2947      return "MODIFY_COLUMN_FAMILY_STORE_FILE_TRACKER";
2948    }
2949  }
2950
2951  private static class FlushTableProcedureBiConsumer extends TableProcedureBiConsumer {
2952
2953    FlushTableProcedureBiConsumer(TableName tableName) {
2954      super(tableName);
2955    }
2956
2957    @Override
2958    String getOperationType() {
2959      return "FLUSH";
2960    }
2961  }
2962
2963  private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2964
2965    CreateNamespaceProcedureBiConsumer(String namespaceName) {
2966      super(namespaceName);
2967    }
2968
2969    @Override
2970    String getOperationType() {
2971      return "CREATE_NAMESPACE";
2972    }
2973  }
2974
2975  private static class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2976
2977    DeleteNamespaceProcedureBiConsumer(String namespaceName) {
2978      super(namespaceName);
2979    }
2980
2981    @Override
2982    String getOperationType() {
2983      return "DELETE_NAMESPACE";
2984    }
2985  }
2986
2987  private static class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2988
2989    ModifyNamespaceProcedureBiConsumer(String namespaceName) {
2990      super(namespaceName);
2991    }
2992
2993    @Override
2994    String getOperationType() {
2995      return "MODIFY_NAMESPACE";
2996    }
2997  }
2998
2999  private static class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
3000
3001    MergeTableRegionProcedureBiConsumer(TableName tableName) {
3002      super(tableName);
3003    }
3004
3005    @Override
3006    String getOperationType() {
3007      return "MERGE_REGIONS";
3008    }
3009  }
3010
3011  private static class SplitTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
3012
3013    SplitTableRegionProcedureBiConsumer(TableName tableName) {
3014      super(tableName);
3015    }
3016
3017    @Override
3018    String getOperationType() {
3019      return "SPLIT_REGION";
3020    }
3021  }
3022
3023  private static class TruncateRegionProcedureBiConsumer extends TableProcedureBiConsumer {
3024
3025    TruncateRegionProcedureBiConsumer(TableName tableName) {
3026      super(tableName);
3027    }
3028
3029    @Override
3030    String getOperationType() {
3031      return "TRUNCATE_REGION";
3032    }
3033  }
3034
3035  private static class SnapshotProcedureBiConsumer extends TableProcedureBiConsumer {
3036    SnapshotProcedureBiConsumer(TableName tableName) {
3037      super(tableName);
3038    }
3039
3040    @Override
3041    String getOperationType() {
3042      return "SNAPSHOT";
3043    }
3044  }
3045
3046  private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer<Void> {
3047    private final String peerId;
3048    private final Supplier<String> getOperation;
3049
3050    ReplicationProcedureBiConsumer(String peerId, Supplier<String> getOperation) {
3051      this.peerId = peerId;
3052      this.getOperation = getOperation;
3053    }
3054
3055    String getDescription() {
3056      return "Operation: " + getOperation.get() + ", peerId: " + peerId;
3057    }
3058
3059    @Override
3060    void onFinished() {
3061      LOG.info("{} completed", getDescription());
3062    }
3063
3064    @Override
3065    void onError(Throwable error) {
3066      LOG.info("{} failed with {}", getDescription(), error.getMessage());
3067    }
3068  }
3069
3070  private static final class RollAllWALWritersBiConsumer
3071    extends ProcedureBiConsumer<Map<ServerName, Long>> {
3072
3073    @Override
3074    void onFinished() {
3075      LOG.info("Rolling all WAL writers completed");
3076    }
3077
3078    @Override
3079    void onError(Throwable error) {
3080      LOG.warn("Rolling all WAL writers failed with {}", error.getMessage());
3081    }
3082  }
3083
3084  private <T> CompletableFuture<T> waitProcedureResult(CompletableFuture<Long> procFuture,
3085    Converter<T, ByteString> converter) {
3086    CompletableFuture<T> future = new CompletableFuture<>();
3087    addListener(procFuture, (procId, error) -> {
3088      if (error != null) {
3089        future.completeExceptionally(error);
3090        return;
3091      }
3092      getProcedureResult(procId, converter, future, 0);
3093    });
3094    return future;
3095  }
3096
3097  private <T> void getProcedureResult(long procId, Converter<T, ByteString> converter,
3098    CompletableFuture<T> future, int retries) {
3099    addListener(
3100      this.<GetProcedureResultResponse> newMasterCaller()
3101        .action((controller, stub) -> this.<GetProcedureResultRequest, GetProcedureResultResponse,
3102          GetProcedureResultResponse> call(controller, stub,
3103            GetProcedureResultRequest.newBuilder().setProcId(procId).build(),
3104            (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp))
3105        .call(),
3106      (response, error) -> {
3107        if (error != null) {
3108          LOG.warn("failed to get the procedure result procId={}", procId,
3109            ConnectionUtils.translateException(error));
3110          retryTimer.newTimeout(t -> getProcedureResult(procId, converter, future, retries + 1),
3111            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
3112          return;
3113        }
3114        if (response.getState() == GetProcedureResultResponse.State.RUNNING) {
3115          retryTimer.newTimeout(t -> getProcedureResult(procId, converter, future, retries + 1),
3116            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
3117          return;
3118        }
3119        if (response.hasException()) {
3120          IOException ioe = ForeignExceptionUtil.toIOException(response.getException());
3121          future.completeExceptionally(ioe);
3122        } else {
3123          try {
3124            future.complete(converter.convert(response.getResult()));
3125          } catch (IOException e) {
3126            future.completeExceptionally(e);
3127          }
3128        }
3129      });
3130  }
3131
3132  private <T> CompletableFuture<T> failedFuture(Throwable error) {
3133    CompletableFuture<T> future = new CompletableFuture<>();
3134    future.completeExceptionally(error);
3135    return future;
3136  }
3137
3138  private <T> boolean completeExceptionally(CompletableFuture<T> future, Throwable error) {
3139    if (error != null) {
3140      future.completeExceptionally(error);
3141      return true;
3142    }
3143    return false;
3144  }
3145
3146  @Override
3147  public CompletableFuture<ClusterMetrics> getClusterMetrics() {
3148    return getClusterMetrics(EnumSet.allOf(Option.class));
3149  }
3150
3151  @Override
3152  public CompletableFuture<ClusterMetrics> getClusterMetrics(EnumSet<Option> options) {
3153    return this.<ClusterMetrics> newMasterCaller()
3154      .action((controller, stub) -> this.<GetClusterStatusRequest, GetClusterStatusResponse,
3155        ClusterMetrics> call(controller, stub,
3156          RequestConverter.buildGetClusterStatusRequest(options),
3157          (s, c, req, done) -> s.getClusterStatus(c, req, done),
3158          resp -> ClusterMetricsBuilder.toClusterMetrics(resp.getClusterStatus())))
3159      .call();
3160  }
3161
3162  @Override
3163  public CompletableFuture<Void> shutdown() {
3164    return this.<Void> newMasterCaller().priority(HIGH_QOS)
3165      .action((controller, stub) -> this.<ShutdownRequest, ShutdownResponse, Void> call(controller,
3166        stub, ShutdownRequest.newBuilder().build(), (s, c, req, done) -> s.shutdown(c, req, done),
3167        resp -> null))
3168      .call();
3169  }
3170
3171  @Override
3172  public CompletableFuture<Void> stopMaster() {
3173    return this.<Void> newMasterCaller().priority(HIGH_QOS)
3174      .action((controller, stub) -> this.<StopMasterRequest, StopMasterResponse, Void> call(
3175        controller, stub, StopMasterRequest.newBuilder().build(),
3176        (s, c, req, done) -> s.stopMaster(c, req, done), resp -> null))
3177      .call();
3178  }
3179
3180  @Override
3181  public CompletableFuture<Void> stopRegionServer(ServerName serverName) {
3182    StopServerRequest request = RequestConverter
3183      .buildStopServerRequest("Called by admin client " + this.connection.toString());
3184    return this.<Void> newAdminCaller().priority(HIGH_QOS)
3185      .action((controller, stub) -> this.<StopServerRequest, StopServerResponse, Void> adminCall(
3186        controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done),
3187        resp -> null))
3188      .serverName(serverName).call();
3189  }
3190
3191  @Override
3192  public CompletableFuture<Void> updateConfiguration(ServerName serverName) {
3193    return this.<Void> newAdminCaller()
3194      .action((controller, stub) -> this.<UpdateConfigurationRequest, UpdateConfigurationResponse,
3195        Void> adminCall(controller, stub, UpdateConfigurationRequest.getDefaultInstance(),
3196          (s, c, req, done) -> s.updateConfiguration(controller, req, done), resp -> null))
3197      .serverName(serverName).call();
3198  }
3199
3200  @Override
3201  public CompletableFuture<Void> updateConfiguration() {
3202    CompletableFuture<Void> future = new CompletableFuture<Void>();
3203    addListener(
3204      getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.MASTER, Option.BACKUP_MASTERS)),
3205      (status, err) -> {
3206        if (err != null) {
3207          future.completeExceptionally(err);
3208        } else {
3209          List<CompletableFuture<Void>> futures = new ArrayList<>();
3210          status.getServersName().forEach(server -> futures.add(updateConfiguration(server)));
3211          futures.add(updateConfiguration(status.getMasterName()));
3212          status.getBackupMasterNames().forEach(master -> futures.add(updateConfiguration(master)));
3213          addListener(
3214            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3215            (result, err2) -> {
3216              if (err2 != null) {
3217                future.completeExceptionally(err2);
3218              } else {
3219                future.complete(result);
3220              }
3221            });
3222        }
3223      });
3224    return future;
3225  }
3226
3227  @Override
3228  public CompletableFuture<Void> updateConfiguration(String groupName) {
3229    CompletableFuture<Void> future = new CompletableFuture<Void>();
3230    addListener(getRSGroup(groupName), (rsGroupInfo, err) -> {
3231      if (err != null) {
3232        future.completeExceptionally(err);
3233      } else if (rsGroupInfo == null) {
3234        future.completeExceptionally(
3235          new IllegalArgumentException("Group does not exist: " + groupName));
3236      } else {
3237        addListener(getClusterMetrics(EnumSet.of(Option.SERVERS_NAME)), (status, err2) -> {
3238          if (err2 != null) {
3239            future.completeExceptionally(err2);
3240          } else {
3241            List<CompletableFuture<Void>> futures = new ArrayList<>();
3242            List<ServerName> groupServers = status.getServersName().stream()
3243              .filter(s -> rsGroupInfo.containsServer(s.getAddress())).collect(Collectors.toList());
3244            groupServers.forEach(server -> futures.add(updateConfiguration(server)));
3245            addListener(
3246              CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3247              (result, err3) -> {
3248                if (err3 != null) {
3249                  future.completeExceptionally(err3);
3250                } else {
3251                  future.complete(result);
3252                }
3253              });
3254          }
3255        });
3256      }
3257    });
3258    return future;
3259  }
3260
3261  @Override
3262  public CompletableFuture<Void> rollWALWriter(ServerName serverName) {
3263    return this.<Void> newAdminCaller()
3264      .action((controller, stub) -> this.<RollWALWriterRequest, RollWALWriterResponse,
3265        Void> adminCall(controller, stub, RequestConverter.buildRollWALWriterRequest(),
3266          (s, c, req, done) -> s.rollWALWriter(controller, req, done), resp -> null))
3267      .serverName(serverName).call();
3268  }
3269
3270  @Override
3271  public CompletableFuture<Map<ServerName, Long>> rollAllWALWriters() {
3272    return this
3273      .<RollAllWALWritersRequest, RollAllWALWritersResponse,
3274        Map<ServerName,
3275          Long>> procedureCall(
3276            RequestConverter.buildRollAllWALWritersRequest(ng.getNonceGroup(), ng.newNonce()),
3277            (s, c, req, done) -> s.rollAllWALWriters(c, req, done), resp -> resp.getProcId(),
3278            result -> LastHighestWalFilenum.parseFrom(result.toByteArray()).getFileNumMap()
3279              .entrySet().stream().collect(Collectors
3280                .toUnmodifiableMap(e -> ServerName.valueOf(e.getKey()), Map.Entry::getValue)),
3281            new RollAllWALWritersBiConsumer());
3282  }
3283
3284  @Override
3285  public CompletableFuture<Void> clearCompactionQueues(ServerName serverName, Set<String> queues) {
3286    return this.<Void> newAdminCaller()
3287      .action((controller, stub) -> this.<ClearCompactionQueuesRequest,
3288        ClearCompactionQueuesResponse, Void> adminCall(controller, stub,
3289          RequestConverter.buildClearCompactionQueuesRequest(queues),
3290          (s, c, req, done) -> s.clearCompactionQueues(controller, req, done), resp -> null))
3291      .serverName(serverName).call();
3292  }
3293
3294  @Override
3295  public CompletableFuture<List<SecurityCapability>> getSecurityCapabilities() {
3296    return this.<List<SecurityCapability>> newMasterCaller()
3297      .action((controller, stub) -> this.<SecurityCapabilitiesRequest, SecurityCapabilitiesResponse,
3298        List<SecurityCapability>> call(controller, stub,
3299          SecurityCapabilitiesRequest.newBuilder().build(),
3300          (s, c, req, done) -> s.getSecurityCapabilities(c, req, done),
3301          (resp) -> ProtobufUtil.toSecurityCapabilityList(resp.getCapabilitiesList())))
3302      .call();
3303  }
3304
3305  @Override
3306  public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName) {
3307    return getRegionMetrics(GetRegionLoadRequest.newBuilder().build(), serverName);
3308  }
3309
3310  @Override
3311  public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName,
3312    TableName tableName) {
3313    Preconditions.checkNotNull(tableName,
3314      "tableName is null. If you don't specify a tableName, use getRegionLoads() instead");
3315    return getRegionMetrics(RequestConverter.buildGetRegionLoadRequest(tableName), serverName);
3316  }
3317
3318  private CompletableFuture<List<RegionMetrics>> getRegionMetrics(GetRegionLoadRequest request,
3319    ServerName serverName) {
3320    return this.<List<RegionMetrics>> newAdminCaller()
3321      .action((controller, stub) -> this.<GetRegionLoadRequest, GetRegionLoadResponse,
3322        List<RegionMetrics>> adminCall(controller, stub, request,
3323          (s, c, req, done) -> s.getRegionLoad(controller, req, done),
3324          RegionMetricsBuilder::toRegionMetrics))
3325      .serverName(serverName).call();
3326  }
3327
3328  @Override
3329  public CompletableFuture<Boolean> isMasterInMaintenanceMode() {
3330    return this.<Boolean> newMasterCaller()
3331      .action((controller, stub) -> this.<IsInMaintenanceModeRequest, IsInMaintenanceModeResponse,
3332        Boolean> call(controller, stub, IsInMaintenanceModeRequest.newBuilder().build(),
3333          (s, c, req, done) -> s.isMasterInMaintenanceMode(c, req, done),
3334          resp -> resp.getInMaintenanceMode()))
3335      .call();
3336  }
3337
3338  @Override
3339  public CompletableFuture<CompactionState> getCompactionState(TableName tableName,
3340    CompactType compactType) {
3341    CompletableFuture<CompactionState> future = new CompletableFuture<>();
3342
3343    switch (compactType) {
3344      case MOB:
3345        addListener(connection.registry.getActiveMaster(), (serverName, err) -> {
3346          if (err != null) {
3347            future.completeExceptionally(err);
3348            return;
3349          }
3350          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
3351
3352          addListener(this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName)
3353            .action((controller, stub) -> this.<GetRegionInfoRequest, GetRegionInfoResponse,
3354              GetRegionInfoResponse> adminCall(controller, stub,
3355                RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true),
3356                (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
3357            .call(), (resp2, err2) -> {
3358              if (err2 != null) {
3359                future.completeExceptionally(err2);
3360              } else {
3361                if (resp2.hasCompactionState()) {
3362                  future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
3363                } else {
3364                  future.complete(CompactionState.NONE);
3365                }
3366              }
3367            });
3368        });
3369        break;
3370      case NORMAL:
3371        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
3372          if (err != null) {
3373            future.completeExceptionally(err);
3374            return;
3375          }
3376          ConcurrentLinkedQueue<CompactionState> regionStates = new ConcurrentLinkedQueue<>();
3377          List<CompletableFuture<CompactionState>> futures = new ArrayList<>();
3378          locations.stream().filter(loc -> loc.getServerName() != null)
3379            .filter(loc -> loc.getRegion() != null).filter(loc -> !loc.getRegion().isOffline())
3380            .map(loc -> loc.getRegion().getRegionName()).forEach(region -> {
3381              futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> {
3382                // If any region compaction state is MAJOR_AND_MINOR
3383                // the table compaction state is MAJOR_AND_MINOR, too.
3384                if (err2 != null) {
3385                  future.completeExceptionally(unwrapCompletionException(err2));
3386                } else if (regionState == CompactionState.MAJOR_AND_MINOR) {
3387                  future.complete(regionState);
3388                } else {
3389                  regionStates.add(regionState);
3390                }
3391              }));
3392            });
3393          addListener(
3394            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3395            (ret, err3) -> {
3396              // If future not completed, check all regions's compaction state
3397              if (!future.isCompletedExceptionally() && !future.isDone()) {
3398                CompactionState state = CompactionState.NONE;
3399                for (CompactionState regionState : regionStates) {
3400                  switch (regionState) {
3401                    case MAJOR:
3402                      if (state == CompactionState.MINOR) {
3403                        future.complete(CompactionState.MAJOR_AND_MINOR);
3404                      } else {
3405                        state = CompactionState.MAJOR;
3406                      }
3407                      break;
3408                    case MINOR:
3409                      if (state == CompactionState.MAJOR) {
3410                        future.complete(CompactionState.MAJOR_AND_MINOR);
3411                      } else {
3412                        state = CompactionState.MINOR;
3413                      }
3414                      break;
3415                    case NONE:
3416                    default:
3417                  }
3418                }
3419                if (!future.isDone()) {
3420                  future.complete(state);
3421                }
3422              }
3423            });
3424        });
3425        break;
3426      default:
3427        throw new IllegalArgumentException("Unknown compactType: " + compactType);
3428    }
3429
3430    return future;
3431  }
3432
3433  @Override
3434  public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) {
3435    CompletableFuture<CompactionState> future = new CompletableFuture<>();
3436    addListener(getRegionLocation(regionName), (location, err) -> {
3437      if (err != null) {
3438        future.completeExceptionally(err);
3439        return;
3440      }
3441      ServerName serverName = location.getServerName();
3442      if (serverName == null) {
3443        future
3444          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
3445        return;
3446      }
3447      addListener(
3448        this.<GetRegionInfoResponse> newAdminCaller()
3449          .action((controller, stub) -> this.<GetRegionInfoRequest, GetRegionInfoResponse,
3450            GetRegionInfoResponse> adminCall(controller, stub,
3451              RequestConverter.buildGetRegionInfoRequest(location.getRegion().getRegionName(),
3452                true),
3453              (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
3454          .serverName(serverName).call(),
3455        (resp2, err2) -> {
3456          if (err2 != null) {
3457            future.completeExceptionally(err2);
3458          } else {
3459            if (resp2.hasCompactionState()) {
3460              future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
3461            } else {
3462              future.complete(CompactionState.NONE);
3463            }
3464          }
3465        });
3466    });
3467    return future;
3468  }
3469
3470  @Override
3471  public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName) {
3472    MajorCompactionTimestampRequest request = MajorCompactionTimestampRequest.newBuilder()
3473      .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
3474    return this.<Optional<Long>> newMasterCaller()
3475      .action((controller, stub) -> this.<MajorCompactionTimestampRequest,
3476        MajorCompactionTimestampResponse, Optional<Long>> call(controller, stub, request,
3477          (s, c, req, done) -> s.getLastMajorCompactionTimestamp(c, req, done),
3478          ProtobufUtil::toOptionalTimestamp))
3479      .call();
3480  }
3481
3482  @Override
3483  public CompletableFuture<Optional<Long>>
3484    getLastMajorCompactionTimestampForRegion(byte[] regionName) {
3485    CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
3486    // regionName may be a full region name or encoded region name, so getRegionInfo(byte[]) first
3487    addListener(getRegionInfo(regionName), (region, err) -> {
3488      if (err != null) {
3489        future.completeExceptionally(err);
3490        return;
3491      }
3492      MajorCompactionTimestampForRegionRequest.Builder builder =
3493        MajorCompactionTimestampForRegionRequest.newBuilder();
3494      builder.setRegion(
3495        RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName));
3496      addListener(this.<Optional<Long>> newMasterCaller()
3497        .action((controller, stub) -> this.<MajorCompactionTimestampForRegionRequest,
3498          MajorCompactionTimestampResponse, Optional<Long>> call(controller, stub, builder.build(),
3499            (s, c, req, done) -> s.getLastMajorCompactionTimestampForRegion(c, req, done),
3500            ProtobufUtil::toOptionalTimestamp))
3501        .call(), (timestamp, err2) -> {
3502          if (err2 != null) {
3503            future.completeExceptionally(err2);
3504          } else {
3505            future.complete(timestamp);
3506          }
3507        });
3508    });
3509    return future;
3510  }
3511
3512  @Override
3513  public CompletableFuture<Map<ServerName, Boolean>> compactionSwitch(boolean switchState,
3514    List<String> serverNamesList) {
3515    CompletableFuture<Map<ServerName, Boolean>> future = new CompletableFuture<>();
3516    addListener(getRegionServerList(serverNamesList), (serverNames, err) -> {
3517      if (err != null) {
3518        future.completeExceptionally(err);
3519        return;
3520      }
3521      // Accessed by multiple threads.
3522      Map<ServerName, Boolean> serverStates = new ConcurrentHashMap<>(serverNames.size());
3523      List<CompletableFuture<Boolean>> futures = new ArrayList<>(serverNames.size());
3524      serverNames.stream().forEach(serverName -> {
3525        futures.add(switchCompact(serverName, switchState).whenComplete((serverState, err2) -> {
3526          if (err2 != null) {
3527            future.completeExceptionally(unwrapCompletionException(err2));
3528          } else {
3529            serverStates.put(serverName, serverState);
3530          }
3531        }));
3532      });
3533      addListener(
3534        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3535        (ret, err3) -> {
3536          if (!future.isCompletedExceptionally()) {
3537            if (err3 != null) {
3538              future.completeExceptionally(err3);
3539            } else {
3540              future.complete(serverStates);
3541            }
3542          }
3543        });
3544    });
3545    return future;
3546  }
3547
3548  private CompletableFuture<List<ServerName>> getRegionServerList(List<String> serverNamesList) {
3549    CompletableFuture<List<ServerName>> future = new CompletableFuture<>();
3550    if (serverNamesList.isEmpty()) {
3551      CompletableFuture<ClusterMetrics> clusterMetricsCompletableFuture =
3552        getClusterMetrics(EnumSet.of(Option.SERVERS_NAME));
3553      addListener(clusterMetricsCompletableFuture, (clusterMetrics, err) -> {
3554        if (err != null) {
3555          future.completeExceptionally(err);
3556        } else {
3557          future.complete(clusterMetrics.getServersName());
3558        }
3559      });
3560      return future;
3561    } else {
3562      List<ServerName> serverList = new ArrayList<>();
3563      for (String regionServerName : serverNamesList) {
3564        ServerName serverName = null;
3565        try {
3566          serverName = ServerName.valueOf(regionServerName);
3567        } catch (Exception e) {
3568          future.completeExceptionally(
3569            new IllegalArgumentException(String.format("ServerName format: %s", regionServerName)));
3570        }
3571        if (serverName == null) {
3572          future.completeExceptionally(
3573            new IllegalArgumentException(String.format("Null ServerName: %s", regionServerName)));
3574        } else {
3575          serverList.add(serverName);
3576        }
3577      }
3578      future.complete(serverList);
3579    }
3580    return future;
3581  }
3582
3583  private CompletableFuture<Boolean> switchCompact(ServerName serverName, boolean onOrOff) {
3584    return this.<Boolean> newAdminCaller().serverName(serverName)
3585      .action((controller, stub) -> this.<CompactionSwitchRequest, CompactionSwitchResponse,
3586        Boolean> adminCall(controller, stub,
3587          CompactionSwitchRequest.newBuilder().setEnabled(onOrOff).build(),
3588          (s, c, req, done) -> s.compactionSwitch(c, req, done), resp -> resp.getPrevState()))
3589      .call();
3590  }
3591
3592  @Override
3593  public CompletableFuture<Boolean> balancerSwitch(boolean on, boolean drainRITs) {
3594    return this.<Boolean> newMasterCaller()
3595      .action((controller, stub) -> this.<SetBalancerRunningRequest, SetBalancerRunningResponse,
3596        Boolean> call(controller, stub,
3597          RequestConverter.buildSetBalancerRunningRequest(on, drainRITs),
3598          (s, c, req, done) -> s.setBalancerRunning(c, req, done),
3599          (resp) -> resp.getPrevBalanceValue()))
3600      .call();
3601  }
3602
3603  @Override
3604  public CompletableFuture<BalanceResponse> balance(BalanceRequest request) {
3605    return this.<BalanceResponse> newMasterCaller()
3606      .action((controller, stub) -> this.<MasterProtos.BalanceRequest, MasterProtos.BalanceResponse,
3607        BalanceResponse> call(controller, stub, ProtobufUtil.toBalanceRequest(request),
3608          (s, c, req, done) -> s.balance(c, req, done),
3609          (resp) -> ProtobufUtil.toBalanceResponse(resp)))
3610      .call();
3611  }
3612
3613  @Override
3614  public CompletableFuture<Boolean> isBalancerEnabled() {
3615    return this.<Boolean> newMasterCaller()
3616      .action((controller, stub) -> this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse,
3617        Boolean> call(controller, stub, RequestConverter.buildIsBalancerEnabledRequest(),
3618          (s, c, req, done) -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled()))
3619      .call();
3620  }
3621
3622  @Override
3623  public CompletableFuture<Boolean> normalizerSwitch(boolean on) {
3624    return this.<Boolean> newMasterCaller()
3625      .action((controller, stub) -> this.<SetNormalizerRunningRequest, SetNormalizerRunningResponse,
3626        Boolean> call(controller, stub, RequestConverter.buildSetNormalizerRunningRequest(on),
3627          (s, c, req, done) -> s.setNormalizerRunning(c, req, done),
3628          (resp) -> resp.getPrevNormalizerValue()))
3629      .call();
3630  }
3631
3632  @Override
3633  public CompletableFuture<Boolean> isNormalizerEnabled() {
3634    return this.<Boolean> newMasterCaller()
3635      .action((controller, stub) -> this.<IsNormalizerEnabledRequest, IsNormalizerEnabledResponse,
3636        Boolean> call(controller, stub, RequestConverter.buildIsNormalizerEnabledRequest(),
3637          (s, c, req, done) -> s.isNormalizerEnabled(c, req, done), (resp) -> resp.getEnabled()))
3638      .call();
3639  }
3640
3641  @Override
3642  public CompletableFuture<Boolean> normalize(NormalizeTableFilterParams ntfp) {
3643    return normalize(RequestConverter.buildNormalizeRequest(ntfp));
3644  }
3645
3646  private CompletableFuture<Boolean> normalize(NormalizeRequest request) {
3647    return this.<Boolean> newMasterCaller().action((controller, stub) -> this.call(controller, stub,
3648      request, MasterService.Interface::normalize, NormalizeResponse::getNormalizerRan)).call();
3649  }
3650
3651  @Override
3652  public CompletableFuture<Boolean> cleanerChoreSwitch(boolean enabled) {
3653    return this.<Boolean> newMasterCaller()
3654      .action((controller, stub) -> this.<SetCleanerChoreRunningRequest,
3655        SetCleanerChoreRunningResponse, Boolean> call(controller, stub,
3656          RequestConverter.buildSetCleanerChoreRunningRequest(enabled),
3657          (s, c, req, done) -> s.setCleanerChoreRunning(c, req, done),
3658          (resp) -> resp.getPrevValue()))
3659      .call();
3660  }
3661
3662  @Override
3663  public CompletableFuture<Boolean> isCleanerChoreEnabled() {
3664    return this.<Boolean> newMasterCaller()
3665      .action(
3666        (controller, stub) -> this.<IsCleanerChoreEnabledRequest, IsCleanerChoreEnabledResponse,
3667          Boolean> call(controller, stub, RequestConverter.buildIsCleanerChoreEnabledRequest(),
3668            (s, c, req, done) -> s.isCleanerChoreEnabled(c, req, done), (resp) -> resp.getValue()))
3669      .call();
3670  }
3671
3672  @Override
3673  public CompletableFuture<Boolean> runCleanerChore() {
3674    return this.<Boolean> newMasterCaller()
3675      .action((controller, stub) -> this.<RunCleanerChoreRequest, RunCleanerChoreResponse,
3676        Boolean> call(controller, stub, RequestConverter.buildRunCleanerChoreRequest(),
3677          (s, c, req, done) -> s.runCleanerChore(c, req, done),
3678          (resp) -> resp.getCleanerChoreRan()))
3679      .call();
3680  }
3681
3682  @Override
3683  public CompletableFuture<Boolean> catalogJanitorSwitch(boolean enabled) {
3684    return this.<Boolean> newMasterCaller()
3685      .action((controller, stub) -> this.<EnableCatalogJanitorRequest, EnableCatalogJanitorResponse,
3686        Boolean> call(controller, stub, RequestConverter.buildEnableCatalogJanitorRequest(enabled),
3687          (s, c, req, done) -> s.enableCatalogJanitor(c, req, done), (resp) -> resp.getPrevValue()))
3688      .call();
3689  }
3690
3691  @Override
3692  public CompletableFuture<Boolean> isCatalogJanitorEnabled() {
3693    return this.<Boolean> newMasterCaller()
3694      .action((controller, stub) -> this.<IsCatalogJanitorEnabledRequest,
3695        IsCatalogJanitorEnabledResponse, Boolean> call(controller, stub,
3696          RequestConverter.buildIsCatalogJanitorEnabledRequest(),
3697          (s, c, req, done) -> s.isCatalogJanitorEnabled(c, req, done), (resp) -> resp.getValue()))
3698      .call();
3699  }
3700
3701  @Override
3702  public CompletableFuture<Integer> runCatalogJanitor() {
3703    return this.<Integer> newMasterCaller()
3704      .action((controller, stub) -> this.<RunCatalogScanRequest, RunCatalogScanResponse,
3705        Integer> call(controller, stub, RequestConverter.buildCatalogScanRequest(),
3706          (s, c, req, done) -> s.runCatalogScan(c, req, done), (resp) -> resp.getScanResult()))
3707      .call();
3708  }
3709
3710  @Override
3711  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3712    ServiceCaller<S, R> callable) {
3713    MasterCoprocessorRpcChannelImpl channel =
3714      new MasterCoprocessorRpcChannelImpl(this.<Message> newMasterCaller());
3715    S stub = stubMaker.apply(channel);
3716    CompletableFuture<R> future = new CompletableFuture<>();
3717    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3718    callable.call(stub, controller, resp -> {
3719      if (controller.failed()) {
3720        future.completeExceptionally(controller.getFailed());
3721      } else {
3722        future.complete(resp);
3723      }
3724    });
3725    return future;
3726  }
3727
3728  @Override
3729  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3730    ServiceCaller<S, R> callable, ServerName serverName) {
3731    RegionServerCoprocessorRpcChannelImpl channel = new RegionServerCoprocessorRpcChannelImpl(
3732      this.<Message> newServerCaller().serverName(serverName));
3733    S stub = stubMaker.apply(channel);
3734    CompletableFuture<R> future = new CompletableFuture<>();
3735    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3736    callable.call(stub, controller, resp -> {
3737      if (controller.failed()) {
3738        future.completeExceptionally(controller.getFailed());
3739      } else {
3740        future.complete(resp);
3741      }
3742    });
3743    return future;
3744  }
3745
3746  @Override
3747  public CompletableFuture<List<ServerName>> clearDeadServers(List<ServerName> servers) {
3748    return this.<List<ServerName>> newMasterCaller()
3749      .action((controller, stub) -> this.<ClearDeadServersRequest, ClearDeadServersResponse,
3750        List<ServerName>> call(controller, stub,
3751          RequestConverter.buildClearDeadServersRequest(servers),
3752          (s, c, req, done) -> s.clearDeadServers(c, req, done),
3753          (resp) -> ProtobufUtil.toServerNameList(resp.getServerNameList())))
3754      .call();
3755  }
3756
3757  <T> ServerRequestCallerBuilder<T> newServerCaller() {
3758    return this.connection.callerFactory.<T> serverRequest()
3759      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
3760      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
3761      .pause(pauseNs, TimeUnit.NANOSECONDS)
3762      .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
3763      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
3764  }
3765
3766  @Override
3767  public CompletableFuture<Void> enableTableReplication(TableName tableName) {
3768    if (tableName == null) {
3769      return failedFuture(new IllegalArgumentException("Table name is null"));
3770    }
3771    CompletableFuture<Void> future = new CompletableFuture<>();
3772    addListener(tableExists(tableName), (exist, err) -> {
3773      if (err != null) {
3774        future.completeExceptionally(err);
3775        return;
3776      }
3777      if (!exist) {
3778        future.completeExceptionally(new TableNotFoundException(
3779          "Table '" + tableName.getNameAsString() + "' does not exists."));
3780        return;
3781      }
3782      addListener(getTableSplits(tableName), (splits, err1) -> {
3783        if (err1 != null) {
3784          future.completeExceptionally(err1);
3785        } else {
3786          addListener(checkAndSyncTableToPeerClusters(tableName, splits), (result, err2) -> {
3787            if (err2 != null) {
3788              future.completeExceptionally(err2);
3789            } else {
3790              addListener(setTableReplication(tableName, true), (result3, err3) -> {
3791                if (err3 != null) {
3792                  future.completeExceptionally(err3);
3793                } else {
3794                  future.complete(result3);
3795                }
3796              });
3797            }
3798          });
3799        }
3800      });
3801    });
3802    return future;
3803  }
3804
3805  @Override
3806  public CompletableFuture<Void> disableTableReplication(TableName tableName) {
3807    if (tableName == null) {
3808      return failedFuture(new IllegalArgumentException("Table name is null"));
3809    }
3810    CompletableFuture<Void> future = new CompletableFuture<>();
3811    addListener(tableExists(tableName), (exist, err) -> {
3812      if (err != null) {
3813        future.completeExceptionally(err);
3814        return;
3815      }
3816      if (!exist) {
3817        future.completeExceptionally(new TableNotFoundException(
3818          "Table '" + tableName.getNameAsString() + "' does not exists."));
3819        return;
3820      }
3821      addListener(setTableReplication(tableName, false), (result, err2) -> {
3822        if (err2 != null) {
3823          future.completeExceptionally(err2);
3824        } else {
3825          future.complete(result);
3826        }
3827      });
3828    });
3829    return future;
3830  }
3831
3832  private CompletableFuture<byte[][]> getTableSplits(TableName tableName) {
3833    CompletableFuture<byte[][]> future = new CompletableFuture<>();
3834    addListener(
3835      getRegions(tableName).thenApply(regions -> regions.stream()
3836        .filter(RegionReplicaUtil::isDefaultReplica).collect(Collectors.toList())),
3837      (regions, err2) -> {
3838        if (err2 != null) {
3839          future.completeExceptionally(err2);
3840          return;
3841        }
3842        if (regions.size() == 1) {
3843          future.complete(null);
3844        } else {
3845          byte[][] splits = new byte[regions.size() - 1][];
3846          for (int i = 1; i < regions.size(); i++) {
3847            splits[i - 1] = regions.get(i).getStartKey();
3848          }
3849          future.complete(splits);
3850        }
3851      });
3852    return future;
3853  }
3854
3855  /**
3856   * Connect to peer and check the table descriptor on peer:
3857   * <ol>
3858   * <li>Create the same table on peer when not exist.</li>
3859   * <li>Throw an exception if the table already has replication enabled on any of the column
3860   * families.</li>
3861   * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li>
3862   * </ol>
3863   * @param tableName name of the table to sync to the peer
3864   * @param splits    table split keys
3865   */
3866  private CompletableFuture<Void> checkAndSyncTableToPeerClusters(TableName tableName,
3867    byte[][] splits) {
3868    CompletableFuture<Void> future = new CompletableFuture<>();
3869    addListener(listReplicationPeers(), (peers, err) -> {
3870      if (err != null) {
3871        future.completeExceptionally(err);
3872        return;
3873      }
3874      if (peers == null || peers.size() <= 0) {
3875        future.completeExceptionally(
3876          new IllegalArgumentException("Found no peer cluster for replication."));
3877        return;
3878      }
3879      List<CompletableFuture<Void>> futures = new ArrayList<>();
3880      peers.stream().filter(peer -> peer.getPeerConfig().needToReplicate(tableName))
3881        .forEach(peer -> {
3882          futures.add(trySyncTableToPeerCluster(tableName, splits, peer));
3883        });
3884      addListener(
3885        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3886        (result, err2) -> {
3887          if (err2 != null) {
3888            future.completeExceptionally(err2);
3889          } else {
3890            future.complete(result);
3891          }
3892        });
3893    });
3894    return future;
3895  }
3896
3897  private CompletableFuture<Void> trySyncTableToPeerCluster(TableName tableName, byte[][] splits,
3898    ReplicationPeerDescription peer) {
3899    Configuration peerConf;
3900    try {
3901      peerConf = ReplicationPeerConfigUtil
3902        .getPeerClusterConfiguration(connection.getConfiguration(), peer.getPeerConfig());
3903    } catch (IOException e) {
3904      return failedFuture(e);
3905    }
3906    URI connectionUri =
3907      ConnectionRegistryFactory.tryParseAsConnectionURI(peer.getPeerConfig().getClusterKey());
3908    CompletableFuture<Void> future = new CompletableFuture<>();
3909    addListener(ConnectionFactory.createAsyncConnection(connectionUri, peerConf), (conn, err) -> {
3910      if (err != null) {
3911        future.completeExceptionally(err);
3912        return;
3913      }
3914      addListener(getDescriptor(tableName), (tableDesc, err1) -> {
3915        if (err1 != null) {
3916          future.completeExceptionally(err1);
3917          return;
3918        }
3919        AsyncAdmin peerAdmin = conn.getAdmin();
3920        addListener(peerAdmin.tableExists(tableName), (exist, err2) -> {
3921          if (err2 != null) {
3922            future.completeExceptionally(err2);
3923            return;
3924          }
3925          if (!exist) {
3926            CompletableFuture<Void> createTableFuture = null;
3927            if (splits == null) {
3928              createTableFuture = peerAdmin.createTable(tableDesc);
3929            } else {
3930              createTableFuture = peerAdmin.createTable(tableDesc, splits);
3931            }
3932            addListener(createTableFuture, (result, err3) -> {
3933              if (err3 != null) {
3934                future.completeExceptionally(err3);
3935              } else {
3936                future.complete(result);
3937              }
3938            });
3939          } else {
3940            addListener(compareTableWithPeerCluster(tableName, tableDesc, peer, peerAdmin),
3941              (result, err4) -> {
3942                if (err4 != null) {
3943                  future.completeExceptionally(err4);
3944                } else {
3945                  future.complete(result);
3946                }
3947              });
3948          }
3949        });
3950      });
3951    });
3952    return future;
3953  }
3954
3955  private CompletableFuture<Void> compareTableWithPeerCluster(TableName tableName,
3956    TableDescriptor tableDesc, ReplicationPeerDescription peer, AsyncAdmin peerAdmin) {
3957    CompletableFuture<Void> future = new CompletableFuture<>();
3958    addListener(peerAdmin.getDescriptor(tableName), (peerTableDesc, err) -> {
3959      if (err != null) {
3960        future.completeExceptionally(err);
3961        return;
3962      }
3963      if (peerTableDesc == null) {
3964        future.completeExceptionally(
3965          new IllegalArgumentException("Failed to get table descriptor for table "
3966            + tableName.getNameAsString() + " from peer cluster " + peer.getPeerId()));
3967        return;
3968      }
3969      if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc, tableDesc) != 0) {
3970        future.completeExceptionally(new IllegalArgumentException(
3971          "Table " + tableName.getNameAsString() + " exists in peer cluster " + peer.getPeerId()
3972            + ", but the table descriptors are not same when compared with source cluster."
3973            + " Thus can not enable the table's replication switch."));
3974        return;
3975      }
3976      future.complete(null);
3977    });
3978    return future;
3979  }
3980
3981  /**
3982   * Set the table's replication switch if the table's replication switch is already not set.
3983   * @param tableName name of the table
3984   * @param enableRep is replication switch enable or disable
3985   */
3986  private CompletableFuture<Void> setTableReplication(TableName tableName, boolean enableRep) {
3987    CompletableFuture<Void> future = new CompletableFuture<>();
3988    addListener(getDescriptor(tableName), (tableDesc, err) -> {
3989      if (err != null) {
3990        future.completeExceptionally(err);
3991        return;
3992      }
3993      if (!tableDesc.matchReplicationScope(enableRep)) {
3994        int scope =
3995          enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL;
3996        TableDescriptor newTableDesc =
3997          TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build();
3998        addListener(modifyTable(newTableDesc), (result, err2) -> {
3999          if (err2 != null) {
4000            future.completeExceptionally(err2);
4001          } else {
4002            future.complete(result);
4003          }
4004        });
4005      } else {
4006        future.complete(null);
4007      }
4008    });
4009    return future;
4010  }
4011
4012  @Override
4013  public CompletableFuture<Boolean> isReplicationPeerEnabled(String peerId) {
4014    GetReplicationPeerStateRequest.Builder request = GetReplicationPeerStateRequest.newBuilder();
4015    request.setPeerId(peerId);
4016    return this.<Boolean> newMasterCaller()
4017      .action((controller, stub) -> this.<GetReplicationPeerStateRequest,
4018        GetReplicationPeerStateResponse, Boolean> call(controller, stub, request.build(),
4019          (s, c, req, done) -> s.isReplicationPeerEnabled(c, req, done),
4020          resp -> resp.getIsEnabled()))
4021      .call();
4022  }
4023
4024  private void waitUntilAllReplicationPeerModificationProceduresDone(
4025    CompletableFuture<Boolean> future, boolean prevOn, int retries) {
4026    CompletableFuture<List<ProcedureProtos.Procedure>> callFuture =
4027      this.<List<ProcedureProtos.Procedure>> newMasterCaller()
4028        .action((controller, stub) -> this.<GetReplicationPeerModificationProceduresRequest,
4029          GetReplicationPeerModificationProceduresResponse, List<ProcedureProtos.Procedure>> call(
4030            controller, stub, GetReplicationPeerModificationProceduresRequest.getDefaultInstance(),
4031            (s, c, req, done) -> s.getReplicationPeerModificationProcedures(c, req, done),
4032            resp -> resp.getProcedureList()))
4033        .call();
4034    addListener(callFuture, (r, e) -> {
4035      if (e != null) {
4036        future.completeExceptionally(e);
4037      } else if (r.isEmpty()) {
4038        // we are done
4039        future.complete(prevOn);
4040      } else {
4041        // retry later to see if the procedures are done
4042        retryTimer.newTimeout(
4043          t -> waitUntilAllReplicationPeerModificationProceduresDone(future, prevOn, retries + 1),
4044          ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
4045      }
4046    });
4047  }
4048
4049  @Override
4050  public CompletableFuture<Boolean> replicationPeerModificationSwitch(boolean on,
4051    boolean drainProcedures) {
4052    ReplicationPeerModificationSwitchRequest request =
4053      ReplicationPeerModificationSwitchRequest.newBuilder().setOn(on).build();
4054    CompletableFuture<Boolean> callFuture = this.<Boolean> newMasterCaller()
4055      .action((controller, stub) -> this.<ReplicationPeerModificationSwitchRequest,
4056        ReplicationPeerModificationSwitchResponse, Boolean> call(controller, stub, request,
4057          (s, c, req, done) -> s.replicationPeerModificationSwitch(c, req, done),
4058          resp -> resp.getPreviousValue()))
4059      .call();
4060    // if we do not need to wait all previous peer modification procedure done, or we are enabling
4061    // peer modification, just return here.
4062    if (!drainProcedures || on) {
4063      return callFuture;
4064    }
4065    // otherwise we need to wait until all previous peer modification procedure done
4066    CompletableFuture<Boolean> future = new CompletableFuture<>();
4067    addListener(callFuture, (prevOn, err) -> {
4068      if (err != null) {
4069        future.completeExceptionally(err);
4070        return;
4071      }
4072      // even if the previous state is disabled, we still need to wait here, as there could be
4073      // another client thread which called this method just before us and have already changed the
4074      // state to off, but there are still peer modification procedures not finished, so we should
4075      // also wait here.
4076      waitUntilAllReplicationPeerModificationProceduresDone(future, prevOn, 0);
4077    });
4078    return future;
4079  }
4080
4081  @Override
4082  public CompletableFuture<Boolean> isReplicationPeerModificationEnabled() {
4083    return this.<Boolean> newMasterCaller()
4084      .action((controller, stub) -> this.<IsReplicationPeerModificationEnabledRequest,
4085        IsReplicationPeerModificationEnabledResponse, Boolean> call(controller, stub,
4086          IsReplicationPeerModificationEnabledRequest.getDefaultInstance(),
4087          (s, c, req, done) -> s.isReplicationPeerModificationEnabled(c, req, done),
4088          (resp) -> resp.getEnabled()))
4089      .call();
4090  }
4091
4092  @Override
4093  public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) {
4094    CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>();
4095    addListener(getTableHRegionLocations(tableName), (locations, err) -> {
4096      if (err != null) {
4097        future.completeExceptionally(err);
4098        return;
4099      }
4100      Map<ServerName, List<RegionInfo>> regionInfoByServerName =
4101        locations.stream().filter(l -> l.getRegion() != null)
4102          .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
4103          .collect(Collectors.groupingBy(l -> l.getServerName(),
4104            Collectors.mapping(l -> l.getRegion(), Collectors.toList())));
4105      List<CompletableFuture<CacheEvictionStats>> futures = new ArrayList<>();
4106      CacheEvictionStatsAggregator aggregator = new CacheEvictionStatsAggregator();
4107      for (Map.Entry<ServerName, List<RegionInfo>> entry : regionInfoByServerName.entrySet()) {
4108        futures
4109          .add(clearBlockCache(entry.getKey(), entry.getValue()).whenComplete((stats, err2) -> {
4110            if (err2 != null) {
4111              future.completeExceptionally(unwrapCompletionException(err2));
4112            } else {
4113              aggregator.append(stats);
4114            }
4115          }));
4116      }
4117      addListener(CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])),
4118        (ret, err3) -> {
4119          if (err3 != null) {
4120            future.completeExceptionally(unwrapCompletionException(err3));
4121          } else {
4122            future.complete(aggregator.sum());
4123          }
4124        });
4125    });
4126    return future;
4127  }
4128
4129  @Override
4130  public CompletableFuture<Void> cloneTableSchema(TableName tableName, TableName newTableName,
4131    boolean preserveSplits) {
4132    CompletableFuture<Void> future = new CompletableFuture<>();
4133    addListener(tableExists(tableName), (exist, err) -> {
4134      if (err != null) {
4135        future.completeExceptionally(err);
4136        return;
4137      }
4138      if (!exist) {
4139        future.completeExceptionally(new TableNotFoundException(tableName));
4140        return;
4141      }
4142      addListener(tableExists(newTableName), (exist1, err1) -> {
4143        if (err1 != null) {
4144          future.completeExceptionally(err1);
4145          return;
4146        }
4147        if (exist1) {
4148          future.completeExceptionally(new TableExistsException(newTableName));
4149          return;
4150        }
4151        addListener(getDescriptor(tableName), (tableDesc, err2) -> {
4152          if (err2 != null) {
4153            future.completeExceptionally(err2);
4154            return;
4155          }
4156          TableDescriptor newTableDesc = TableDescriptorBuilder.copy(newTableName, tableDesc);
4157          if (preserveSplits) {
4158            addListener(getTableSplits(tableName), (splits, err3) -> {
4159              if (err3 != null) {
4160                future.completeExceptionally(err3);
4161              } else {
4162                addListener(
4163                  splits != null ? createTable(newTableDesc, splits) : createTable(newTableDesc),
4164                  (result, err4) -> {
4165                    if (err4 != null) {
4166                      future.completeExceptionally(err4);
4167                    } else {
4168                      future.complete(result);
4169                    }
4170                  });
4171              }
4172            });
4173          } else {
4174            addListener(createTable(newTableDesc), (result, err5) -> {
4175              if (err5 != null) {
4176                future.completeExceptionally(err5);
4177              } else {
4178                future.complete(result);
4179              }
4180            });
4181          }
4182        });
4183      });
4184    });
4185    return future;
4186  }
4187
4188  private CompletableFuture<CacheEvictionStats> clearBlockCache(ServerName serverName,
4189    List<RegionInfo> hris) {
4190    return this.<CacheEvictionStats> newAdminCaller()
4191      .action((controller, stub) -> this.<ClearRegionBlockCacheRequest,
4192        ClearRegionBlockCacheResponse, CacheEvictionStats> adminCall(controller, stub,
4193          RequestConverter.buildClearRegionBlockCacheRequest(hris),
4194          (s, c, req, done) -> s.clearRegionBlockCache(controller, req, done),
4195          resp -> ProtobufUtil.toCacheEvictionStats(resp.getStats())))
4196      .serverName(serverName).call();
4197  }
4198
4199  @Override
4200  public CompletableFuture<Boolean> switchRpcThrottle(boolean enable) {
4201    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
4202      .action((controller, stub) -> this.<SwitchRpcThrottleRequest, SwitchRpcThrottleResponse,
4203        Boolean> call(controller, stub,
4204          SwitchRpcThrottleRequest.newBuilder().setRpcThrottleEnabled(enable).build(),
4205          (s, c, req, done) -> s.switchRpcThrottle(c, req, done),
4206          resp -> resp.getPreviousRpcThrottleEnabled()))
4207      .call();
4208    return future;
4209  }
4210
4211  @Override
4212  public CompletableFuture<Boolean> isRpcThrottleEnabled() {
4213    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
4214      .action((controller, stub) -> this.<IsRpcThrottleEnabledRequest, IsRpcThrottleEnabledResponse,
4215        Boolean> call(controller, stub, IsRpcThrottleEnabledRequest.newBuilder().build(),
4216          (s, c, req, done) -> s.isRpcThrottleEnabled(c, req, done),
4217          resp -> resp.getRpcThrottleEnabled()))
4218      .call();
4219    return future;
4220  }
4221
4222  @Override
4223  public CompletableFuture<Boolean> exceedThrottleQuotaSwitch(boolean enable) {
4224    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
4225      .action((controller, stub) -> this.<SwitchExceedThrottleQuotaRequest,
4226        SwitchExceedThrottleQuotaResponse, Boolean> call(controller, stub,
4227          SwitchExceedThrottleQuotaRequest.newBuilder().setExceedThrottleQuotaEnabled(enable)
4228            .build(),
4229          (s, c, req, done) -> s.switchExceedThrottleQuota(c, req, done),
4230          resp -> resp.getPreviousExceedThrottleQuotaEnabled()))
4231      .call();
4232    return future;
4233  }
4234
4235  @Override
4236  public CompletableFuture<Map<TableName, Long>> getSpaceQuotaTableSizes() {
4237    return this.<Map<TableName, Long>> newMasterCaller()
4238      .action((controller, stub) -> this.<GetSpaceQuotaRegionSizesRequest,
4239        GetSpaceQuotaRegionSizesResponse, Map<TableName, Long>> call(controller, stub,
4240          RequestConverter.buildGetSpaceQuotaRegionSizesRequest(),
4241          (s, c, req, done) -> s.getSpaceQuotaRegionSizes(c, req, done),
4242          resp -> resp.getSizesList().stream().collect(Collectors
4243            .toMap(sizes -> ProtobufUtil.toTableName(sizes.getTableName()), RegionSizes::getSize))))
4244      .call();
4245  }
4246
4247  @Override
4248  public CompletableFuture<Map<TableName, SpaceQuotaSnapshot>>
4249    getRegionServerSpaceQuotaSnapshots(ServerName serverName) {
4250    return this.<Map<TableName, SpaceQuotaSnapshot>> newAdminCaller()
4251      .action((controller, stub) -> this.<GetSpaceQuotaSnapshotsRequest,
4252        GetSpaceQuotaSnapshotsResponse, Map<TableName, SpaceQuotaSnapshot>> adminCall(controller,
4253          stub, RequestConverter.buildGetSpaceQuotaSnapshotsRequest(),
4254          (s, c, req, done) -> s.getSpaceQuotaSnapshots(controller, req, done),
4255          resp -> resp.getSnapshotsList().stream()
4256            .collect(Collectors.toMap(snapshot -> ProtobufUtil.toTableName(snapshot.getTableName()),
4257              snapshot -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(snapshot.getSnapshot())))))
4258      .serverName(serverName).call();
4259  }
4260
4261  private CompletableFuture<SpaceQuotaSnapshot>
4262    getCurrentSpaceQuotaSnapshot(Converter<SpaceQuotaSnapshot, GetQuotaStatesResponse> converter) {
4263    return this.<SpaceQuotaSnapshot> newMasterCaller()
4264      .action((controller, stub) -> this.<GetQuotaStatesRequest, GetQuotaStatesResponse,
4265        SpaceQuotaSnapshot> call(controller, stub, RequestConverter.buildGetQuotaStatesRequest(),
4266          (s, c, req, done) -> s.getQuotaStates(c, req, done), converter))
4267      .call();
4268  }
4269
4270  @Override
4271  public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(String namespace) {
4272    return getCurrentSpaceQuotaSnapshot(resp -> resp.getNsSnapshotsList().stream()
4273      .filter(s -> s.getNamespace().equals(namespace)).findFirst()
4274      .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
4275  }
4276
4277  @Override
4278  public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(TableName tableName) {
4279    HBaseProtos.TableName protoTableName = ProtobufUtil.toProtoTableName(tableName);
4280    return getCurrentSpaceQuotaSnapshot(resp -> resp.getTableSnapshotsList().stream()
4281      .filter(s -> s.getTableName().equals(protoTableName)).findFirst()
4282      .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
4283  }
4284
4285  @Override
4286  public CompletableFuture<Void> grant(UserPermission userPermission,
4287    boolean mergeExistingPermissions) {
4288    return this.<Void> newMasterCaller()
4289      .action((controller, stub) -> this.<GrantRequest, GrantResponse, Void> call(controller, stub,
4290        ShadedAccessControlUtil.buildGrantRequest(userPermission, mergeExistingPermissions),
4291        (s, c, req, done) -> s.grant(c, req, done), resp -> null))
4292      .call();
4293  }
4294
4295  @Override
4296  public CompletableFuture<Void> revoke(UserPermission userPermission) {
4297    return this.<Void> newMasterCaller()
4298      .action((controller, stub) -> this.<RevokeRequest, RevokeResponse, Void> call(controller,
4299        stub, ShadedAccessControlUtil.buildRevokeRequest(userPermission),
4300        (s, c, req, done) -> s.revoke(c, req, done), resp -> null))
4301      .call();
4302  }
4303
4304  @Override
4305  public CompletableFuture<List<UserPermission>>
4306    getUserPermissions(GetUserPermissionsRequest getUserPermissionsRequest) {
4307    return this.<List<UserPermission>> newMasterCaller()
4308      .action((controller, stub) -> this.<AccessControlProtos.GetUserPermissionsRequest,
4309        GetUserPermissionsResponse, List<UserPermission>> call(controller, stub,
4310          ShadedAccessControlUtil.buildGetUserPermissionsRequest(getUserPermissionsRequest),
4311          (s, c, req, done) -> s.getUserPermissions(c, req, done),
4312          resp -> resp.getUserPermissionList().stream()
4313            .map(uPerm -> ShadedAccessControlUtil.toUserPermission(uPerm))
4314            .collect(Collectors.toList())))
4315      .call();
4316  }
4317
4318  @Override
4319  public CompletableFuture<List<Boolean>> hasUserPermissions(String userName,
4320    List<Permission> permissions) {
4321    return this.<List<Boolean>> newMasterCaller()
4322      .action((controller, stub) -> this.<HasUserPermissionsRequest, HasUserPermissionsResponse,
4323        List<Boolean>> call(controller, stub,
4324          ShadedAccessControlUtil.buildHasUserPermissionsRequest(userName, permissions),
4325          (s, c, req, done) -> s.hasUserPermissions(c, req, done),
4326          resp -> resp.getHasUserPermissionList()))
4327      .call();
4328  }
4329
4330  @Override
4331  public CompletableFuture<Boolean> snapshotCleanupSwitch(final boolean on, final boolean sync) {
4332    return this.<Boolean> newMasterCaller()
4333      .action((controller, stub) -> this.call(controller, stub,
4334        RequestConverter.buildSetSnapshotCleanupRequest(on, sync),
4335        MasterService.Interface::switchSnapshotCleanup,
4336        SetSnapshotCleanupResponse::getPrevSnapshotCleanup))
4337      .call();
4338  }
4339
4340  @Override
4341  public CompletableFuture<Boolean> isSnapshotCleanupEnabled() {
4342    return this.<Boolean> newMasterCaller()
4343      .action((controller, stub) -> this.call(controller, stub,
4344        RequestConverter.buildIsSnapshotCleanupEnabledRequest(),
4345        MasterService.Interface::isSnapshotCleanupEnabled,
4346        IsSnapshotCleanupEnabledResponse::getEnabled))
4347      .call();
4348  }
4349
4350  @Override
4351  public CompletableFuture<Void> moveServersToRSGroup(Set<Address> servers, String groupName) {
4352    return this.<Void> newMasterCaller()
4353      .action((controller, stub) -> this.<MoveServersRequest, MoveServersResponse, Void> call(
4354        controller, stub, RequestConverter.buildMoveServersRequest(servers, groupName),
4355        (s, c, req, done) -> s.moveServers(c, req, done), resp -> null))
4356      .call();
4357  }
4358
4359  @Override
4360  public CompletableFuture<Void> addRSGroup(String groupName) {
4361    return this.<Void> newMasterCaller()
4362      .action((controller, stub) -> this.<AddRSGroupRequest, AddRSGroupResponse, Void> call(
4363        controller, stub, AddRSGroupRequest.newBuilder().setRSGroupName(groupName).build(),
4364        (s, c, req, done) -> s.addRSGroup(c, req, done), resp -> null))
4365      .call();
4366  }
4367
4368  @Override
4369  public CompletableFuture<Void> removeRSGroup(String groupName) {
4370    return this.<Void> newMasterCaller()
4371      .action((controller, stub) -> this.<RemoveRSGroupRequest, RemoveRSGroupResponse, Void> call(
4372        controller, stub, RemoveRSGroupRequest.newBuilder().setRSGroupName(groupName).build(),
4373        (s, c, req, done) -> s.removeRSGroup(c, req, done), resp -> null))
4374      .call();
4375  }
4376
4377  @Override
4378  public CompletableFuture<BalanceResponse> balanceRSGroup(String groupName,
4379    BalanceRequest request) {
4380    return this.<BalanceResponse> newMasterCaller()
4381      .action((controller, stub) -> this.<BalanceRSGroupRequest, BalanceRSGroupResponse,
4382        BalanceResponse> call(controller, stub,
4383          ProtobufUtil.createBalanceRSGroupRequest(groupName, request),
4384          MasterService.Interface::balanceRSGroup, ProtobufUtil::toBalanceResponse))
4385      .call();
4386  }
4387
4388  @Override
4389  public CompletableFuture<List<RSGroupInfo>> listRSGroups() {
4390    return this.<List<RSGroupInfo>> newMasterCaller()
4391      .action((controller, stub) -> this.<ListRSGroupInfosRequest, ListRSGroupInfosResponse,
4392        List<RSGroupInfo>> call(controller, stub, ListRSGroupInfosRequest.getDefaultInstance(),
4393          (s, c, req, done) -> s.listRSGroupInfos(c, req, done), resp -> resp.getRSGroupInfoList()
4394            .stream().map(r -> ProtobufUtil.toGroupInfo(r)).collect(Collectors.toList())))
4395      .call();
4396  }
4397
4398  private CompletableFuture<List<LogEntry>> getSlowLogResponses(
4399    final Map<String, Object> filterParams, final Set<ServerName> serverNames, final int limit,
4400    final String logType) {
4401    if (CollectionUtils.isEmpty(serverNames)) {
4402      return CompletableFuture.completedFuture(Collections.emptyList());
4403    }
4404    return CompletableFuture.supplyAsync(() -> serverNames
4405      .stream().map((ServerName serverName) -> getSlowLogResponseFromServer(serverName,
4406        filterParams, limit, logType))
4407      .map(CompletableFuture::join).flatMap(List::stream).collect(Collectors.toList()));
4408  }
4409
4410  private CompletableFuture<List<LogEntry>> getSlowLogResponseFromServer(ServerName serverName,
4411    Map<String, Object> filterParams, int limit, String logType) {
4412    return this.<List<LogEntry>> newAdminCaller()
4413      .action((controller, stub) -> this.adminCall(controller, stub,
4414        RequestConverter.buildSlowLogResponseRequest(filterParams, limit, logType),
4415        AdminService.Interface::getLogEntries, ProtobufUtil::toSlowLogPayloads))
4416      .serverName(serverName).call();
4417  }
4418
4419  @Override
4420  public CompletableFuture<List<Boolean>>
4421    clearSlowLogResponses(@Nullable Set<ServerName> serverNames) {
4422    if (CollectionUtils.isEmpty(serverNames)) {
4423      return CompletableFuture.completedFuture(Collections.emptyList());
4424    }
4425    List<CompletableFuture<Boolean>> clearSlowLogResponseList =
4426      serverNames.stream().map(this::clearSlowLogsResponses).collect(Collectors.toList());
4427    return convertToFutureOfList(clearSlowLogResponseList);
4428  }
4429
4430  private CompletableFuture<Boolean> clearSlowLogsResponses(final ServerName serverName) {
4431    return this.<Boolean> newAdminCaller()
4432      .action((controller, stub) -> this.adminCall(controller, stub,
4433        RequestConverter.buildClearSlowLogResponseRequest(),
4434        AdminService.Interface::clearSlowLogsResponses, ProtobufUtil::toClearSlowLogPayload))
4435      .serverName(serverName).call();
4436  }
4437
4438  private static <T> CompletableFuture<List<T>>
4439    convertToFutureOfList(List<CompletableFuture<T>> futures) {
4440    CompletableFuture<Void> allDoneFuture =
4441      CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
4442    return allDoneFuture
4443      .thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));
4444  }
4445
4446  @Override
4447  public CompletableFuture<List<TableName>> listTablesInRSGroup(String groupName) {
4448    return this.<List<TableName>> newMasterCaller()
4449      .action((controller, stub) -> this.<ListTablesInRSGroupRequest, ListTablesInRSGroupResponse,
4450        List<TableName>> call(controller, stub,
4451          ListTablesInRSGroupRequest.newBuilder().setGroupName(groupName).build(),
4452          (s, c, req, done) -> s.listTablesInRSGroup(c, req, done), resp -> resp.getTableNameList()
4453            .stream().map(ProtobufUtil::toTableName).collect(Collectors.toList())))
4454      .call();
4455  }
4456
4457  @Override
4458  public CompletableFuture<Pair<List<String>, List<TableName>>>
4459    getConfiguredNamespacesAndTablesInRSGroup(String groupName) {
4460    return this.<Pair<List<String>, List<TableName>>> newMasterCaller()
4461      .action((controller, stub) -> this.<GetConfiguredNamespacesAndTablesInRSGroupRequest,
4462        GetConfiguredNamespacesAndTablesInRSGroupResponse,
4463        Pair<List<String>, List<TableName>>> call(controller, stub,
4464          GetConfiguredNamespacesAndTablesInRSGroupRequest.newBuilder().setGroupName(groupName)
4465            .build(),
4466          (s, c, req, done) -> s.getConfiguredNamespacesAndTablesInRSGroup(c, req, done),
4467          resp -> Pair.newPair(resp.getNamespaceList(), resp.getTableNameList().stream()
4468            .map(ProtobufUtil::toTableName).collect(Collectors.toList()))))
4469      .call();
4470  }
4471
4472  @Override
4473  public CompletableFuture<RSGroupInfo> getRSGroup(Address hostPort) {
4474    return this.<RSGroupInfo> newMasterCaller()
4475      .action((controller, stub) -> this.<GetRSGroupInfoOfServerRequest,
4476        GetRSGroupInfoOfServerResponse, RSGroupInfo> call(controller, stub,
4477          GetRSGroupInfoOfServerRequest.newBuilder()
4478            .setServer(HBaseProtos.ServerName.newBuilder().setHostName(hostPort.getHostname())
4479              .setPort(hostPort.getPort()).build())
4480            .build(),
4481          (s, c, req, done) -> s.getRSGroupInfoOfServer(c, req, done),
4482          resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null))
4483      .call();
4484  }
4485
4486  @Override
4487  public CompletableFuture<Void> removeServersFromRSGroup(Set<Address> servers) {
4488    return this.<Void> newMasterCaller()
4489      .action((controller, stub) -> this.<RemoveServersRequest, RemoveServersResponse, Void> call(
4490        controller, stub, RequestConverter.buildRemoveServersRequest(servers),
4491        (s, c, req, done) -> s.removeServers(c, req, done), resp -> null))
4492      .call();
4493  }
4494
4495  @Override
4496  public CompletableFuture<Void> setRSGroup(Set<TableName> tables, String groupName) {
4497    CompletableFuture<Void> future = new CompletableFuture<>();
4498    for (TableName tableName : tables) {
4499      addListener(tableExists(tableName), (exist, err) -> {
4500        if (err != null) {
4501          future.completeExceptionally(err);
4502          return;
4503        }
4504        if (!exist) {
4505          future.completeExceptionally(new TableNotFoundException(tableName));
4506          return;
4507        }
4508      });
4509    }
4510    addListener(listTableDescriptors(new ArrayList<>(tables)), (tableDescriptions, err) -> {
4511      if (err != null) {
4512        future.completeExceptionally(err);
4513        return;
4514      }
4515      if (tableDescriptions == null || tableDescriptions.isEmpty()) {
4516        future.complete(null);
4517        return;
4518      }
4519      List<TableDescriptor> newTableDescriptors = new ArrayList<>();
4520      for (TableDescriptor td : tableDescriptions) {
4521        newTableDescriptors
4522          .add(TableDescriptorBuilder.newBuilder(td).setRegionServerGroup(groupName).build());
4523      }
4524      addListener(
4525        CompletableFuture.allOf(
4526          newTableDescriptors.stream().map(this::modifyTable).toArray(CompletableFuture[]::new)),
4527        (v, e) -> {
4528          if (e != null) {
4529            future.completeExceptionally(e);
4530          } else {
4531            future.complete(v);
4532          }
4533        });
4534    });
4535    return future;
4536  }
4537
4538  @Override
4539  public CompletableFuture<RSGroupInfo> getRSGroup(TableName table) {
4540    return this.<RSGroupInfo> newMasterCaller()
4541      .action((controller, stub) -> this.<GetRSGroupInfoOfTableRequest,
4542        GetRSGroupInfoOfTableResponse, RSGroupInfo> call(controller, stub,
4543          GetRSGroupInfoOfTableRequest.newBuilder()
4544            .setTableName(ProtobufUtil.toProtoTableName(table)).build(),
4545          (s, c, req, done) -> s.getRSGroupInfoOfTable(c, req, done),
4546          resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null))
4547      .call();
4548  }
4549
4550  @Override
4551  public CompletableFuture<RSGroupInfo> getRSGroup(String groupName) {
4552    return this.<RSGroupInfo> newMasterCaller()
4553      .action((controller, stub) -> this.<GetRSGroupInfoRequest, GetRSGroupInfoResponse,
4554        RSGroupInfo> call(controller, stub,
4555          GetRSGroupInfoRequest.newBuilder().setRSGroupName(groupName).build(),
4556          (s, c, req, done) -> s.getRSGroupInfo(c, req, done),
4557          resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null))
4558      .call();
4559  }
4560
4561  @Override
4562  public CompletableFuture<Void> renameRSGroup(String oldName, String newName) {
4563    return this.<Void> newMasterCaller()
4564      .action((controller, stub) -> this.<RenameRSGroupRequest, RenameRSGroupResponse, Void> call(
4565        controller, stub, RenameRSGroupRequest.newBuilder().setOldRsgroupName(oldName)
4566          .setNewRsgroupName(newName).build(),
4567        (s, c, req, done) -> s.renameRSGroup(c, req, done), resp -> null))
4568      .call();
4569  }
4570
4571  @Override
4572  public CompletableFuture<Void> updateRSGroupConfig(String groupName,
4573    Map<String, String> configuration) {
4574    UpdateRSGroupConfigRequest.Builder request =
4575      UpdateRSGroupConfigRequest.newBuilder().setGroupName(groupName);
4576    if (configuration != null) {
4577      configuration.entrySet().forEach(e -> request.addConfiguration(
4578        NameStringPair.newBuilder().setName(e.getKey()).setValue(e.getValue()).build()));
4579    }
4580    return this.<Void> newMasterCaller()
4581      .action((controller, stub) -> this.<UpdateRSGroupConfigRequest, UpdateRSGroupConfigResponse,
4582        Void> call(controller, stub, request.build(),
4583          (s, c, req, done) -> s.updateRSGroupConfig(c, req, done), resp -> null))
4584      .call();
4585  }
4586
4587  private CompletableFuture<List<LogEntry>> getBalancerDecisions(final int limit) {
4588    return this.<List<LogEntry>> newMasterCaller()
4589      .action((controller, stub) -> this.call(controller, stub,
4590        ProtobufUtil.toBalancerDecisionRequest(limit), MasterService.Interface::getLogEntries,
4591        ProtobufUtil::toBalancerDecisionResponse))
4592      .call();
4593  }
4594
4595  private CompletableFuture<List<LogEntry>> getBalancerRejections(final int limit) {
4596    return this.<List<LogEntry>> newMasterCaller()
4597      .action((controller, stub) -> this.call(controller, stub,
4598        ProtobufUtil.toBalancerRejectionRequest(limit), MasterService.Interface::getLogEntries,
4599        ProtobufUtil::toBalancerRejectionResponse))
4600      .call();
4601  }
4602
4603  @Override
4604  public CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNames,
4605    String logType, ServerType serverType, int limit, Map<String, Object> filterParams) {
4606    if (logType == null || serverType == null) {
4607      throw new IllegalArgumentException("logType and/or serverType cannot be empty");
4608    }
4609    switch (logType) {
4610      case "SLOW_LOG":
4611      case "LARGE_LOG":
4612        if (ServerType.MASTER.equals(serverType)) {
4613          throw new IllegalArgumentException("Slow/Large logs are not maintained by HMaster");
4614        }
4615        return getSlowLogResponses(filterParams, serverNames, limit, logType);
4616      case "BALANCER_DECISION":
4617        if (ServerType.REGION_SERVER.equals(serverType)) {
4618          throw new IllegalArgumentException(
4619            "Balancer Decision logs are not maintained by HRegionServer");
4620        }
4621        return getBalancerDecisions(limit);
4622      case "BALANCER_REJECTION":
4623        if (ServerType.REGION_SERVER.equals(serverType)) {
4624          throw new IllegalArgumentException(
4625            "Balancer Rejection logs are not maintained by HRegionServer");
4626        }
4627        return getBalancerRejections(limit);
4628      default:
4629        return CompletableFuture.completedFuture(Collections.emptyList());
4630    }
4631  }
4632
4633  @Override
4634  public CompletableFuture<Void> flushMasterStore() {
4635    FlushMasterStoreRequest.Builder request = FlushMasterStoreRequest.newBuilder();
4636    return this.<Void> newMasterCaller()
4637      .action((controller, stub) -> this.<FlushMasterStoreRequest, FlushMasterStoreResponse,
4638        Void> call(controller, stub, request.build(),
4639          (s, c, req, done) -> s.flushMasterStore(c, req, done), resp -> null))
4640      .call();
4641  }
4642
4643  @Override
4644  public CompletableFuture<List<String>> getCachedFilesList(ServerName serverName) {
4645    GetCachedFilesListRequest.Builder request = GetCachedFilesListRequest.newBuilder();
4646    return this.<List<String>> newAdminCaller()
4647      .action((controller, stub) -> this.<GetCachedFilesListRequest, GetCachedFilesListResponse,
4648        List<String>> adminCall(controller, stub, request.build(),
4649          (s, c, req, done) -> s.getCachedFilesList(c, req, done),
4650          resp -> resp.getCachedFilesList()))
4651      .serverName(serverName).call();
4652  }
4653
4654  @Override
4655  public CompletableFuture<Void> restoreBackupSystemTable(String snapshotName) {
4656    MasterProtos.RestoreBackupSystemTableRequest request =
4657      MasterProtos.RestoreBackupSystemTableRequest.newBuilder().setSnapshotName(snapshotName)
4658        .build();
4659    return this.<MasterProtos.RestoreBackupSystemTableRequest,
4660      MasterProtos.RestoreBackupSystemTableResponse> procedureCall(request,
4661        MasterService.Interface::restoreBackupSystemTable,
4662        MasterProtos.RestoreBackupSystemTableResponse::getProcId,
4663        new RestoreBackupSystemTableProcedureBiConsumer());
4664  }
4665}