001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.HIGH_QOS;
021import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
022import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
023import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
024
025import edu.umd.cs.findbugs.annotations.Nullable;
026import java.io.IOException;
027import java.net.URI;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.Collections;
031import java.util.EnumSet;
032import java.util.HashMap;
033import java.util.List;
034import java.util.Map;
035import java.util.Optional;
036import java.util.Set;
037import java.util.concurrent.CompletableFuture;
038import java.util.concurrent.ConcurrentHashMap;
039import java.util.concurrent.ConcurrentLinkedQueue;
040import java.util.concurrent.TimeUnit;
041import java.util.concurrent.atomic.AtomicReference;
042import java.util.function.BiConsumer;
043import java.util.function.Consumer;
044import java.util.function.Function;
045import java.util.function.Supplier;
046import java.util.regex.Pattern;
047import java.util.stream.Collectors;
048import java.util.stream.Stream;
049import org.apache.hadoop.conf.Configuration;
050import org.apache.hadoop.hbase.CacheEvictionStats;
051import org.apache.hadoop.hbase.CacheEvictionStatsAggregator;
052import org.apache.hadoop.hbase.CatalogFamilyFormat;
053import org.apache.hadoop.hbase.ClientMetaTableAccessor;
054import org.apache.hadoop.hbase.ClusterMetrics;
055import org.apache.hadoop.hbase.ClusterMetrics.Option;
056import org.apache.hadoop.hbase.ClusterMetricsBuilder;
057import org.apache.hadoop.hbase.DoNotRetryIOException;
058import org.apache.hadoop.hbase.HConstants;
059import org.apache.hadoop.hbase.HRegionLocation;
060import org.apache.hadoop.hbase.NamespaceDescriptor;
061import org.apache.hadoop.hbase.RegionLocations;
062import org.apache.hadoop.hbase.RegionMetrics;
063import org.apache.hadoop.hbase.RegionMetricsBuilder;
064import org.apache.hadoop.hbase.ServerName;
065import org.apache.hadoop.hbase.TableExistsException;
066import org.apache.hadoop.hbase.TableName;
067import org.apache.hadoop.hbase.TableNotDisabledException;
068import org.apache.hadoop.hbase.TableNotEnabledException;
069import org.apache.hadoop.hbase.TableNotFoundException;
070import org.apache.hadoop.hbase.UnknownRegionException;
071import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder;
072import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
073import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder;
074import org.apache.hadoop.hbase.client.Scan.ReadType;
075import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
076import org.apache.hadoop.hbase.client.replication.TableCFs;
077import org.apache.hadoop.hbase.client.security.SecurityCapability;
078import org.apache.hadoop.hbase.exceptions.DeserializationException;
079import org.apache.hadoop.hbase.ipc.HBaseRpcController;
080import org.apache.hadoop.hbase.net.Address;
081import org.apache.hadoop.hbase.quotas.QuotaFilter;
082import org.apache.hadoop.hbase.quotas.QuotaSettings;
083import org.apache.hadoop.hbase.quotas.QuotaTableUtil;
084import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
085import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
086import org.apache.hadoop.hbase.replication.ReplicationException;
087import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
088import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
089import org.apache.hadoop.hbase.replication.SyncReplicationState;
090import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
091import org.apache.hadoop.hbase.security.access.GetUserPermissionsRequest;
092import org.apache.hadoop.hbase.security.access.Permission;
093import org.apache.hadoop.hbase.security.access.ShadedAccessControlUtil;
094import org.apache.hadoop.hbase.security.access.UserPermission;
095import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
096import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
097import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
098import org.apache.hadoop.hbase.util.Bytes;
099import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
100import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
101import org.apache.hadoop.hbase.util.Pair;
102import org.apache.hadoop.hbase.util.Strings;
103import org.apache.yetus.audience.InterfaceAudience;
104import org.slf4j.Logger;
105import org.slf4j.LoggerFactory;
106
107import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
108import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
109import org.apache.hbase.thirdparty.com.google.protobuf.Message;
110import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
111import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
112import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
113import org.apache.hbase.thirdparty.io.netty.util.Timeout;
114import org.apache.hbase.thirdparty.io.netty.util.TimerTask;
115import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
116
117import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
118import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
119import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos;
120import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse;
121import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantRequest;
122import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantResponse;
123import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsRequest;
124import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsResponse;
125import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeRequest;
126import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeResponse;
127import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
128import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
129import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
130import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
131import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
132import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
133import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
134import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
135import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
136import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
137import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
138import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListRequest;
139import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListResponse;
140import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
141import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
142import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
143import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
144import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
145import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
146import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
147import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
148import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
149import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
150import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
151import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
152import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
153import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.LastHighestWalFilenum;
154import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
155import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
156import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
157import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
158import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
159import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest;
160import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse;
161import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
162import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse;
163import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest;
164import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse;
165import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest;
166import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersResponse;
167import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest;
168import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse;
169import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
170import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse;
171import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest;
172import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse;
173import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest;
174import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse;
175import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
176import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
177import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
178import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
179import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest;
180import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
181import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
182import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse;
183import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
184import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
185import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
186import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse;
187import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest;
188import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
189import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest;
190import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreResponse;
191import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableRequest;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableResponse;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest;
198import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse;
199import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
200import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
201import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
202import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
203import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest;
204import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresResponse;
205import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledRequest;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledResponse;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotCleanupEnabledResponse;
224import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest;
227import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse;
228import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
229import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse;
230import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
231import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
232import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest;
233import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesResponse;
234import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
235import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
236import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByStateRequest;
237import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByStateResponse;
238import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
239import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse;
240import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByStateRequest;
241import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByStateResponse;
242import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
243import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest;
244import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse;
245import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
246import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest;
247import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse;
248import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest;
249import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse;
250import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerRequest;
251import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerResponse;
252import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
253import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
254import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest;
255import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse;
256import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerRequest;
257import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerResponse;
258import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest;
259import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse;
260import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest;
261import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse;
262import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest;
263import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse;
264import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest;
265import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse;
266import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
267import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
268import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RollAllWALWritersRequest;
269import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RollAllWALWritersResponse;
270import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest;
271import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse;
272import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest;
273import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreResponse;
274import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
275import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse;
276import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
277import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
278import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest;
279import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningResponse;
280import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
281import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
282import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
283import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
284import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSnapshotCleanupResponse;
285import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest;
286import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse;
287import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest;
288import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse;
289import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest;
290import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse;
291import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest;
292import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse;
293import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest;
294import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse;
295import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest;
296import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse;
297import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest;
298import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse;
299import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
300import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
301import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
302import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse;
303import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
304import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest;
305import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
306import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
307import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse;
308import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes;
309import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
310import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
311import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupRequest;
312import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupResponse;
313import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupRequest;
314import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupResponse;
315import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetConfiguredNamespacesAndTablesInRSGroupRequest;
316import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetConfiguredNamespacesAndTablesInRSGroupResponse;
317import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerRequest;
318import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerResponse;
319import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableRequest;
320import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableResponse;
321import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoRequest;
322import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoResponse;
323import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosRequest;
324import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosResponse;
325import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListTablesInRSGroupRequest;
326import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListTablesInRSGroupResponse;
327import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersRequest;
328import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersResponse;
329import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupRequest;
330import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupResponse;
331import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersRequest;
332import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersResponse;
333import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupRequest;
334import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupResponse;
335import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.UpdateRSGroupConfigRequest;
336import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.UpdateRSGroupConfigResponse;
337import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
338import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
339import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
340import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
341import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest;
342import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
343import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
344import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
345import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresRequest;
346import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresResponse;
347import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest;
348import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse;
349import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledRequest;
350import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledResponse;
351import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
352import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
353import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
354import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
355import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchRequest;
356import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchResponse;
357import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest;
358import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse;
359import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
360import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
361import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
362
363/**
364 * The implementation of AsyncAdmin.
365 * <p>
366 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will
367 * be finished inside the rpc framework thread, which means that the callbacks registered to the
368 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use
369 * this class should not try to do time consuming tasks in the callbacks.
370 * @since 2.0.0
371 * @see AsyncHBaseAdmin
372 * @see AsyncConnection#getAdmin()
373 * @see AsyncConnection#getAdminBuilder()
374 */
375@InterfaceAudience.Private
376class RawAsyncHBaseAdmin implements AsyncAdmin {
377
378  public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc";
379
380  private static final Logger LOG = LoggerFactory.getLogger(AsyncHBaseAdmin.class);
381
382  private final AsyncConnectionImpl connection;
383
384  private final HashedWheelTimer retryTimer;
385
386  private final AsyncTable<AdvancedScanResultConsumer> metaTable;
387
388  private final long rpcTimeoutNs;
389
390  private final long operationTimeoutNs;
391
392  private final long pauseNs;
393
394  private final long pauseNsForServerOverloaded;
395
396  private final int maxAttempts;
397
398  private final int startLogErrorsCnt;
399
400  private final NonceGenerator ng;
401
402  RawAsyncHBaseAdmin(AsyncConnectionImpl connection, HashedWheelTimer retryTimer,
403    AsyncAdminBuilderBase builder) {
404    this.connection = connection;
405    this.retryTimer = retryTimer;
406    this.metaTable = connection.getTable(META_TABLE_NAME);
407    this.rpcTimeoutNs = builder.rpcTimeoutNs;
408    this.operationTimeoutNs = builder.operationTimeoutNs;
409    this.pauseNs = builder.pauseNs;
410    if (builder.pauseNsForServerOverloaded < builder.pauseNs) {
411      LOG.warn(
412        "Configured value of pauseNsForServerOverloaded is {} ms, which is less than"
413          + " the normal pause value {} ms, use the greater one instead",
414        TimeUnit.NANOSECONDS.toMillis(builder.pauseNsForServerOverloaded),
415        TimeUnit.NANOSECONDS.toMillis(builder.pauseNs));
416      this.pauseNsForServerOverloaded = builder.pauseNs;
417    } else {
418      this.pauseNsForServerOverloaded = builder.pauseNsForServerOverloaded;
419    }
420    this.maxAttempts = builder.maxAttempts;
421    this.startLogErrorsCnt = builder.startLogErrorsCnt;
422    this.ng = connection.getNonceGenerator();
423  }
424
425  <T> MasterRequestCallerBuilder<T> newMasterCaller() {
426    return this.connection.callerFactory.<T> masterRequest()
427      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
428      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
429      .pause(pauseNs, TimeUnit.NANOSECONDS)
430      .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
431      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
432  }
433
434  private <T> AdminRequestCallerBuilder<T> newAdminCaller() {
435    return this.connection.callerFactory.<T> adminRequest()
436      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
437      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
438      .pause(pauseNs, TimeUnit.NANOSECONDS)
439      .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
440      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
441  }
442
443  @FunctionalInterface
444  private interface MasterRpcCall<RESP, REQ> {
445    void call(MasterService.Interface stub, HBaseRpcController controller, REQ req,
446      RpcCallback<RESP> done);
447  }
448
449  @FunctionalInterface
450  private interface AdminRpcCall<RESP, REQ> {
451    void call(AdminService.Interface stub, HBaseRpcController controller, REQ req,
452      RpcCallback<RESP> done);
453  }
454
455  @FunctionalInterface
456  private interface Converter<D, S> {
457    D convert(S src) throws IOException;
458  }
459
460  private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller,
461    MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall,
462    Converter<RESP, PRESP> respConverter) {
463    CompletableFuture<RESP> future = new CompletableFuture<>();
464    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
465
466      @Override
467      public void run(PRESP resp) {
468        if (controller.failed()) {
469          future.completeExceptionally(controller.getFailed());
470        } else {
471          try {
472            future.complete(respConverter.convert(resp));
473          } catch (IOException e) {
474            future.completeExceptionally(e);
475          }
476        }
477      }
478    });
479    return future;
480  }
481
482  private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller,
483    AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall,
484    Converter<RESP, PRESP> respConverter) {
485    CompletableFuture<RESP> future = new CompletableFuture<>();
486    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
487
488      @Override
489      public void run(PRESP resp) {
490        if (controller.failed()) {
491          future.completeExceptionally(controller.getFailed());
492        } else {
493          try {
494            future.complete(respConverter.convert(resp));
495          } catch (IOException e) {
496            future.completeExceptionally(e);
497          }
498        }
499      }
500    });
501    return future;
502  }
503
504  /**
505   * short-circuit call for
506   * {@link RawAsyncHBaseAdmin#procedureCall(Object, MasterRpcCall, Converter, Converter, ProcedureBiConsumer)}
507   * by ignoring procedure result
508   */
509  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq,
510    MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
511    ProcedureBiConsumer<Void> consumer) {
512    return procedureCall(preq, rpcCall, respConverter, result -> null, consumer);
513  }
514
515  /**
516   * short-circuit call for procedureCall(Consumer, Object, MasterRpcCall, Converter, Converter,
517   * ProcedureBiConsumer) by skip setting priority for request
518   */
519  private <PREQ, PRESP, PRES> CompletableFuture<PRES> procedureCall(PREQ preq,
520    MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
521    Converter<PRES, ByteString> resultConverter, ProcedureBiConsumer<PRES> consumer) {
522    return procedureCall(b -> {
523    }, preq, rpcCall, respConverter, resultConverter, consumer);
524  }
525
526  /**
527   * short-circuit call for procedureCall(TableName, Object, MasterRpcCall, Converter, Converter,
528   * ProcedureBiConsumer) by ignoring procedure result
529   */
530  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(TableName tableName, PREQ preq,
531    MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
532    ProcedureBiConsumer<Void> consumer) {
533    return procedureCall(tableName, preq, rpcCall, respConverter, result -> null, consumer);
534  }
535
536  /**
537   * short-circuit call for procedureCall(Consumer, Object, MasterRpcCall, Converter, Converter,
538   * ProcedureBiConsumer) by skip setting priority for request
539   */
540  private <PREQ, PRESP, PRES> CompletableFuture<PRES> procedureCall(TableName tableName, PREQ preq,
541    MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
542    Converter<PRES, ByteString> resultConverter, ProcedureBiConsumer<PRES> consumer) {
543    return procedureCall(b -> b.priority(tableName), preq, rpcCall, respConverter, resultConverter,
544      consumer);
545  }
546
547  /**
548   * @param <PREQ>          type of request
549   * @param <PRESP>         type of response
550   * @param <PRES>          type of procedure call result
551   * @param prioritySetter  prioritySetter set priority by table for request
552   * @param preq            procedure call request
553   * @param rpcCall         procedure rpc call
554   * @param respConverter   extract proc id from procedure call response
555   * @param resultConverter extract result from procedure call result
556   * @param consumer        action performs on result
557   * @return procedure call result, null if procedure is void
558   */
559  private <PREQ, PRESP, PRES> CompletableFuture<PRES> procedureCall(
560    Consumer<MasterRequestCallerBuilder<?>> prioritySetter, PREQ preq,
561    MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
562    Converter<PRES, ByteString> resultConverter, ProcedureBiConsumer<PRES> consumer) {
563    MasterRequestCallerBuilder<Long> builder = this.<Long> newMasterCaller()
564      .action((controller, stub) -> this.call(controller, stub, preq, rpcCall, respConverter));
565    prioritySetter.accept(builder);
566    CompletableFuture<Long> procFuture = builder.call();
567    CompletableFuture<PRES> future = waitProcedureResult(procFuture, resultConverter);
568    addListener(future, consumer);
569    return future;
570  }
571
572  @Override
573  public CompletableFuture<Boolean> tableExists(TableName tableName) {
574    if (TableName.isMetaTableName(tableName)) {
575      return CompletableFuture.completedFuture(true);
576    }
577    return ClientMetaTableAccessor.tableExists(metaTable, tableName);
578  }
579
580  @Override
581  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(boolean includeSysTables) {
582    return getTableDescriptors(
583      RequestConverter.buildGetTableDescriptorsRequest(null, includeSysTables));
584  }
585
586  /**
587   * {@link #listTableDescriptors(boolean)}
588   */
589  @Override
590  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(Pattern pattern,
591    boolean includeSysTables) {
592    Preconditions.checkNotNull(pattern, "pattern is null. If you don't specify a pattern, "
593      + "use listTableDescriptors(boolean) instead");
594    return getTableDescriptors(
595      RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables));
596  }
597
598  @Override
599  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(List<TableName> tableNames) {
600    Preconditions.checkNotNull(tableNames, "tableNames is null. If you don't specify tableNames, "
601      + "use listTableDescriptors(boolean) instead");
602    if (tableNames.isEmpty()) {
603      return CompletableFuture.completedFuture(Collections.emptyList());
604    }
605    return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(tableNames));
606  }
607
608  private CompletableFuture<List<TableDescriptor>>
609    getTableDescriptors(GetTableDescriptorsRequest request) {
610    return this.<List<TableDescriptor>> newMasterCaller()
611      .action((controller, stub) -> this.<GetTableDescriptorsRequest, GetTableDescriptorsResponse,
612        List<TableDescriptor>> call(controller, stub, request,
613          (s, c, req, done) -> s.getTableDescriptors(c, req, done),
614          (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
615      .call();
616  }
617
618  @Override
619  public CompletableFuture<List<TableName>> listTableNames(boolean includeSysTables) {
620    return getTableNames(RequestConverter.buildGetTableNamesRequest(null, includeSysTables));
621  }
622
623  @Override
624  public CompletableFuture<List<TableName>> listTableNames(Pattern pattern,
625    boolean includeSysTables) {
626    Preconditions.checkNotNull(pattern,
627      "pattern is null. If you don't specify a pattern, use listTableNames(boolean) instead");
628    return getTableNames(RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables));
629  }
630
631  @Override
632  public CompletableFuture<List<TableName>> listTableNamesByState(boolean isEnabled) {
633    return this.<List<TableName>> newMasterCaller()
634      .action((controller, stub) -> this.<ListTableNamesByStateRequest,
635        ListTableNamesByStateResponse, List<TableName>> call(controller, stub,
636          ListTableNamesByStateRequest.newBuilder().setIsEnabled(isEnabled).build(),
637          (s, c, req, done) -> s.listTableNamesByState(c, req, done),
638          (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList())))
639      .call();
640  }
641
642  private CompletableFuture<List<TableName>> getTableNames(GetTableNamesRequest request) {
643    return this.<List<TableName>> newMasterCaller()
644      .action((controller, stub) -> this.<GetTableNamesRequest, GetTableNamesResponse,
645        List<TableName>> call(controller, stub, request,
646          (s, c, req, done) -> s.getTableNames(c, req, done),
647          (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList())))
648      .call();
649  }
650
651  @Override
652  public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByNamespace(String name) {
653    return this.<List<TableDescriptor>> newMasterCaller()
654      .action((controller, stub) -> this.<ListTableDescriptorsByNamespaceRequest,
655        ListTableDescriptorsByNamespaceResponse, List<TableDescriptor>> call(controller, stub,
656          ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
657          (s, c, req, done) -> s.listTableDescriptorsByNamespace(c, req, done),
658          (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
659      .call();
660  }
661
662  @Override
663  public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByState(boolean isEnabled) {
664    return this.<List<TableDescriptor>> newMasterCaller()
665      .action((controller, stub) -> this.<ListTableDescriptorsByStateRequest,
666        ListTableDescriptorsByStateResponse, List<TableDescriptor>> call(controller, stub,
667          ListTableDescriptorsByStateRequest.newBuilder().setIsEnabled(isEnabled).build(),
668          (s, c, req, done) -> s.listTableDescriptorsByState(c, req, done),
669          (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
670      .call();
671  }
672
673  @Override
674  public CompletableFuture<List<TableName>> listTableNamesByNamespace(String name) {
675    return this.<List<TableName>> newMasterCaller()
676      .action((controller, stub) -> this.<ListTableNamesByNamespaceRequest,
677        ListTableNamesByNamespaceResponse, List<TableName>> call(controller, stub,
678          ListTableNamesByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
679          (s, c, req, done) -> s.listTableNamesByNamespace(c, req, done),
680          (resp) -> ProtobufUtil.toTableNameList(resp.getTableNameList())))
681      .call();
682  }
683
684  @Override
685  public CompletableFuture<TableDescriptor> getDescriptor(TableName tableName) {
686    CompletableFuture<TableDescriptor> future = new CompletableFuture<>();
687    addListener(this.<List<TableSchema>> newMasterCaller().priority(tableName)
688      .action((controller, stub) -> this.<GetTableDescriptorsRequest, GetTableDescriptorsResponse,
689        List<TableSchema>> call(controller, stub,
690          RequestConverter.buildGetTableDescriptorsRequest(tableName),
691          (s, c, req, done) -> s.getTableDescriptors(c, req, done),
692          (resp) -> resp.getTableSchemaList()))
693      .call(), (tableSchemas, error) -> {
694        if (error != null) {
695          future.completeExceptionally(error);
696          return;
697        }
698        if (!tableSchemas.isEmpty()) {
699          future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0)));
700        } else {
701          future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString()));
702        }
703      });
704    return future;
705  }
706
707  @Override
708  public CompletableFuture<Void> createTable(TableDescriptor desc) {
709    return createTable(desc.getTableName(),
710      RequestConverter.buildCreateTableRequest(desc, null, ng.getNonceGroup(), ng.newNonce()));
711  }
712
713  @Override
714  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[] startKey, byte[] endKey,
715    int numRegions) {
716    try {
717      return createTable(desc, getSplitKeys(startKey, endKey, numRegions));
718    } catch (IllegalArgumentException e) {
719      return failedFuture(e);
720    }
721  }
722
723  @Override
724  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys) {
725    Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys,"
726      + " use createTable(TableDescriptor) instead");
727    try {
728      verifySplitKeys(splitKeys);
729      return createTable(desc.getTableName(), RequestConverter.buildCreateTableRequest(desc,
730        splitKeys, ng.getNonceGroup(), ng.newNonce()));
731    } catch (IllegalArgumentException e) {
732      return failedFuture(e);
733    }
734  }
735
736  private CompletableFuture<Void> createTable(TableName tableName, CreateTableRequest request) {
737    Preconditions.checkNotNull(tableName, "table name is null");
738    return this.<CreateTableRequest, CreateTableResponse> procedureCall(tableName, request,
739      (s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(),
740      new CreateTableProcedureBiConsumer(tableName));
741  }
742
743  @Override
744  public CompletableFuture<Void> modifyTable(TableDescriptor desc) {
745    return modifyTable(desc, true);
746  }
747
748  @Override
749  public CompletableFuture<Void> modifyTable(TableDescriptor desc, boolean reopenRegions) {
750    return this.<ModifyTableRequest, ModifyTableResponse> procedureCall(desc.getTableName(),
751      RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(),
752        ng.newNonce(), reopenRegions),
753      (s, c, req, done) -> s.modifyTable(c, req, done), (resp) -> resp.getProcId(),
754      new ModifyTableProcedureBiConsumer(this, desc.getTableName()));
755  }
756
757  @Override
758  public CompletableFuture<Void> modifyTableStoreFileTracker(TableName tableName, String dstSFT) {
759    return this.<ModifyTableStoreFileTrackerRequest,
760      ModifyTableStoreFileTrackerResponse> procedureCall(tableName,
761        RequestConverter.buildModifyTableStoreFileTrackerRequest(tableName, dstSFT,
762          ng.getNonceGroup(), ng.newNonce()),
763        (s, c, req, done) -> s.modifyTableStoreFileTracker(c, req, done),
764        (resp) -> resp.getProcId(),
765        new ModifyTableStoreFileTrackerProcedureBiConsumer(this, tableName));
766  }
767
768  @Override
769  public CompletableFuture<Void> deleteTable(TableName tableName) {
770    return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(tableName,
771      RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
772      (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(),
773      new DeleteTableProcedureBiConsumer(tableName));
774  }
775
776  @Override
777  public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) {
778    return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(tableName,
779      RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(),
780        ng.newNonce()),
781      (s, c, req, done) -> s.truncateTable(c, req, done), (resp) -> resp.getProcId(),
782      new TruncateTableProcedureBiConsumer(tableName));
783  }
784
785  @Override
786  public CompletableFuture<Void> enableTable(TableName tableName) {
787    return this.<EnableTableRequest, EnableTableResponse> procedureCall(tableName,
788      RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
789      (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(),
790      new EnableTableProcedureBiConsumer(tableName));
791  }
792
793  @Override
794  public CompletableFuture<Void> disableTable(TableName tableName) {
795    return this.<DisableTableRequest, DisableTableResponse> procedureCall(tableName,
796      RequestConverter.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
797      (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(),
798      new DisableTableProcedureBiConsumer(tableName));
799  }
800
801  /**
802   * Utility for completing passed TableState {@link CompletableFuture} <code>future</code> using
803   * passed parameters. Sets error or boolean result ('true' if table matches the passed-in
804   * targetState).
805   */
806  private static CompletableFuture<Boolean> completeCheckTableState(
807    CompletableFuture<Boolean> future, Optional<TableState> tableState, Throwable error,
808    TableState.State targetState, TableName tableName) {
809    if (error != null) {
810      future.completeExceptionally(error);
811    } else {
812      if (tableState.isPresent()) {
813        future.complete(tableState.get().inStates(targetState));
814      } else {
815        future.completeExceptionally(new TableNotFoundException(tableName));
816      }
817    }
818    return future;
819  }
820
821  @Override
822  public CompletableFuture<Boolean> isTableEnabled(TableName tableName) {
823    if (TableName.isMetaTableName(tableName)) {
824      return CompletableFuture.completedFuture(true);
825    }
826    CompletableFuture<Boolean> future = new CompletableFuture<>();
827    addListener(ClientMetaTableAccessor.getTableState(metaTable, tableName),
828      (tableState, error) -> {
829        completeCheckTableState(future, tableState, error, TableState.State.ENABLED, tableName);
830      });
831    return future;
832  }
833
834  @Override
835  public CompletableFuture<Boolean> isTableDisabled(TableName tableName) {
836    if (TableName.isMetaTableName(tableName)) {
837      return CompletableFuture.completedFuture(false);
838    }
839    CompletableFuture<Boolean> future = new CompletableFuture<>();
840    addListener(ClientMetaTableAccessor.getTableState(metaTable, tableName),
841      (tableState, error) -> {
842        completeCheckTableState(future, tableState, error, TableState.State.DISABLED, tableName);
843      });
844    return future;
845  }
846
847  @Override
848  public CompletableFuture<Boolean> isTableAvailable(TableName tableName) {
849    if (TableName.isMetaTableName(tableName)) {
850      return connection.registry.getMetaRegionLocations().thenApply(locs -> Stream
851        .of(locs.getRegionLocations()).allMatch(loc -> loc != null && loc.getServerName() != null));
852    }
853    CompletableFuture<Boolean> future = new CompletableFuture<>();
854    addListener(isTableEnabled(tableName), (enabled, error) -> {
855      if (error != null) {
856        if (error instanceof TableNotFoundException) {
857          future.complete(false);
858        } else {
859          future.completeExceptionally(error);
860        }
861        return;
862      }
863      if (!enabled) {
864        future.complete(false);
865      } else {
866        addListener(ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName),
867          (locations, error1) -> {
868            if (error1 != null) {
869              future.completeExceptionally(error1);
870              return;
871            }
872            List<HRegionLocation> notDeployedRegions = locations.stream()
873              .filter(loc -> loc.getServerName() == null).collect(Collectors.toList());
874            if (notDeployedRegions.size() > 0) {
875              if (LOG.isDebugEnabled()) {
876                LOG.debug("Table " + tableName + " has " + notDeployedRegions.size() + " regions");
877              }
878              future.complete(false);
879              return;
880            }
881            future.complete(true);
882          });
883      }
884    });
885    return future;
886  }
887
888  @Override
889  public CompletableFuture<Void> addColumnFamily(TableName tableName,
890    ColumnFamilyDescriptor columnFamily) {
891    return this.<AddColumnRequest, AddColumnResponse> procedureCall(tableName,
892      RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
893        ng.newNonce()),
894      (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(),
895      new AddColumnFamilyProcedureBiConsumer(tableName));
896  }
897
898  @Override
899  public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) {
900    return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(tableName,
901      RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
902        ng.newNonce()),
903      (s, c, req, done) -> s.deleteColumn(c, req, done), (resp) -> resp.getProcId(),
904      new DeleteColumnFamilyProcedureBiConsumer(tableName));
905  }
906
907  @Override
908  public CompletableFuture<Void> modifyColumnFamily(TableName tableName,
909    ColumnFamilyDescriptor columnFamily) {
910    return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(tableName,
911      RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
912        ng.newNonce()),
913      (s, c, req, done) -> s.modifyColumn(c, req, done), (resp) -> resp.getProcId(),
914      new ModifyColumnFamilyProcedureBiConsumer(tableName));
915  }
916
917  @Override
918  public CompletableFuture<Void> modifyColumnFamilyStoreFileTracker(TableName tableName,
919    byte[] family, String dstSFT) {
920    return this.<ModifyColumnStoreFileTrackerRequest,
921      ModifyColumnStoreFileTrackerResponse> procedureCall(tableName,
922        RequestConverter.buildModifyColumnStoreFileTrackerRequest(tableName, family, dstSFT,
923          ng.getNonceGroup(), ng.newNonce()),
924        (s, c, req, done) -> s.modifyColumnStoreFileTracker(c, req, done),
925        (resp) -> resp.getProcId(),
926        new ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer(tableName));
927  }
928
929  @Override
930  public CompletableFuture<Void> createNamespace(NamespaceDescriptor descriptor) {
931    return this.<CreateNamespaceRequest, CreateNamespaceResponse> procedureCall(
932      RequestConverter.buildCreateNamespaceRequest(descriptor),
933      (s, c, req, done) -> s.createNamespace(c, req, done), (resp) -> resp.getProcId(),
934      new CreateNamespaceProcedureBiConsumer(descriptor.getName()));
935  }
936
937  @Override
938  public CompletableFuture<Void> modifyNamespace(NamespaceDescriptor descriptor) {
939    return this.<ModifyNamespaceRequest, ModifyNamespaceResponse> procedureCall(
940      RequestConverter.buildModifyNamespaceRequest(descriptor),
941      (s, c, req, done) -> s.modifyNamespace(c, req, done), (resp) -> resp.getProcId(),
942      new ModifyNamespaceProcedureBiConsumer(descriptor.getName()));
943  }
944
945  @Override
946  public CompletableFuture<Void> deleteNamespace(String name) {
947    return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall(
948      RequestConverter.buildDeleteNamespaceRequest(name),
949      (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(),
950      new DeleteNamespaceProcedureBiConsumer(name));
951  }
952
953  @Override
954  public CompletableFuture<NamespaceDescriptor> getNamespaceDescriptor(String name) {
955    return this.<NamespaceDescriptor> newMasterCaller()
956      .action((controller, stub) -> this.<GetNamespaceDescriptorRequest,
957        GetNamespaceDescriptorResponse, NamespaceDescriptor> call(controller, stub,
958          RequestConverter.buildGetNamespaceDescriptorRequest(name),
959          (s, c, req, done) -> s.getNamespaceDescriptor(c, req, done),
960          (resp) -> ProtobufUtil.toNamespaceDescriptor(resp.getNamespaceDescriptor())))
961      .call();
962  }
963
964  @Override
965  public CompletableFuture<List<String>> listNamespaces() {
966    return this.<List<String>> newMasterCaller()
967      .action((controller, stub) -> this.<ListNamespacesRequest, ListNamespacesResponse,
968        List<String>> call(controller, stub, ListNamespacesRequest.newBuilder().build(),
969          (s, c, req, done) -> s.listNamespaces(c, req, done),
970          (resp) -> resp.getNamespaceNameList()))
971      .call();
972  }
973
974  @Override
975  public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() {
976    return this.<List<NamespaceDescriptor>> newMasterCaller()
977      .action((controller, stub) -> this.<ListNamespaceDescriptorsRequest,
978        ListNamespaceDescriptorsResponse, List<NamespaceDescriptor>> call(controller, stub,
979          ListNamespaceDescriptorsRequest.newBuilder().build(),
980          (s, c, req, done) -> s.listNamespaceDescriptors(c, req, done),
981          (resp) -> ProtobufUtil.toNamespaceDescriptorList(resp)))
982      .call();
983  }
984
985  @Override
986  public CompletableFuture<List<RegionInfo>> getRegions(ServerName serverName) {
987    return this.<List<RegionInfo>> newAdminCaller()
988      .action((controller, stub) -> this.<GetOnlineRegionRequest, GetOnlineRegionResponse,
989        List<RegionInfo>> adminCall(controller, stub,
990          RequestConverter.buildGetOnlineRegionRequest(),
991          (s, c, req, done) -> s.getOnlineRegion(c, req, done),
992          resp -> ProtobufUtil.getRegionInfos(resp)))
993      .serverName(serverName).call();
994  }
995
996  @Override
997  public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) {
998    if (tableName.equals(META_TABLE_NAME)) {
999      return connection.registry.getMetaRegionLocations()
1000        .thenApply(locs -> Stream.of(locs.getRegionLocations()).map(HRegionLocation::getRegion)
1001          .collect(Collectors.toList()));
1002    } else {
1003      return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName).thenApply(
1004        locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList()));
1005    }
1006  }
1007
1008  @Override
1009  public CompletableFuture<Void> flush(TableName tableName) {
1010    return flush(tableName, Collections.emptyList());
1011  }
1012
1013  @Override
1014  public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) {
1015    Preconditions.checkNotNull(columnFamily,
1016      "columnFamily is null, If you don't specify a columnFamily, use flush(TableName) instead.");
1017    return flush(tableName, Collections.singletonList(columnFamily));
1018  }
1019
1020  @Override
1021  public CompletableFuture<Void> flush(TableName tableName, List<byte[]> columnFamilyList) {
1022    // This is for keeping compatibility with old implementation.
1023    // If the server version is lower than the client version, it's possible that the
1024    // flushTable method is not present in the server side, if so, we need to fall back
1025    // to the old implementation.
1026    Preconditions.checkNotNull(columnFamilyList,
1027      "columnFamily is null, If you don't specify a columnFamily, use flush(TableName) instead.");
1028    List<byte[]> columnFamilies = columnFamilyList.stream()
1029      .filter(cf -> cf != null && cf.length > 0).distinct().collect(Collectors.toList());
1030    FlushTableRequest request = RequestConverter.buildFlushTableRequest(tableName, columnFamilies,
1031      ng.getNonceGroup(), ng.newNonce());
1032    CompletableFuture<Void> procFuture = this.<FlushTableRequest, FlushTableResponse> procedureCall(
1033      tableName, request, (s, c, req, done) -> s.flushTable(c, req, done),
1034      (resp) -> resp.getProcId(), new FlushTableProcedureBiConsumer(tableName));
1035    CompletableFuture<Void> future = new CompletableFuture<>();
1036    addListener(procFuture, (ret, error) -> {
1037      if (error != null) {
1038        if (
1039          error instanceof TableNotFoundException || error instanceof TableNotEnabledException
1040            || error instanceof NoSuchColumnFamilyException
1041        ) {
1042          future.completeExceptionally(error);
1043        } else if (error instanceof DoNotRetryIOException) {
1044          // usually this is caused by the method is not present on the server or
1045          // the hbase hadoop version does not match the running hadoop version.
1046          // if that happens, we need fall back to the old flush implementation.
1047          LOG.info("Unrecoverable error in master side. Fallback to FlushTableProcedure V1", error);
1048          legacyFlush(future, tableName, columnFamilies);
1049        } else {
1050          future.completeExceptionally(error);
1051        }
1052      } else {
1053        future.complete(ret);
1054      }
1055    });
1056    return future;
1057  }
1058
1059  private void legacyFlush(CompletableFuture<Void> future, TableName tableName,
1060    List<byte[]> columnFamilies) {
1061    addListener(tableExists(tableName), (exists, err) -> {
1062      if (err != null) {
1063        future.completeExceptionally(err);
1064      } else if (!exists) {
1065        future.completeExceptionally(new TableNotFoundException(tableName));
1066      } else {
1067        addListener(isTableEnabled(tableName), (tableEnabled, err2) -> {
1068          if (err2 != null) {
1069            future.completeExceptionally(err2);
1070          } else if (!tableEnabled) {
1071            future.completeExceptionally(new TableNotEnabledException(tableName));
1072          } else {
1073            Map<String, String> props = new HashMap<>();
1074            if (columnFamilies != null && !columnFamilies.isEmpty()) {
1075              props.put(HConstants.FAMILY_KEY_STR, Strings.JOINER
1076                .join(columnFamilies.stream().map(Bytes::toString).collect(Collectors.toList())));
1077            }
1078            addListener(
1079              execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(), props),
1080              (ret, err3) -> {
1081                if (err3 != null) {
1082                  future.completeExceptionally(err3);
1083                } else {
1084                  future.complete(ret);
1085                }
1086              });
1087          }
1088        });
1089      }
1090    });
1091  }
1092
1093  @Override
1094  public CompletableFuture<Void> flushRegion(byte[] regionName) {
1095    return flushRegionInternal(regionName, null, false).thenAccept(r -> {
1096    });
1097  }
1098
1099  @Override
1100  public CompletableFuture<Void> flushRegion(byte[] regionName, byte[] columnFamily) {
1101    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1102      + "If you don't specify a columnFamily, use flushRegion(regionName) instead");
1103    return flushRegionInternal(regionName, columnFamily, false).thenAccept(r -> {
1104    });
1105  }
1106
1107  /**
1108   * This method is for internal use only, where we need the response of the flush.
1109   * <p/>
1110   * As it exposes the protobuf message, please do <strong>NOT</strong> try to expose it as a public
1111   * API.
1112   */
1113  CompletableFuture<FlushRegionResponse> flushRegionInternal(byte[] regionName, byte[] columnFamily,
1114    boolean writeFlushWALMarker) {
1115    CompletableFuture<FlushRegionResponse> future = new CompletableFuture<>();
1116    addListener(getRegionLocation(regionName), (location, err) -> {
1117      if (err != null) {
1118        future.completeExceptionally(err);
1119        return;
1120      }
1121      ServerName serverName = location.getServerName();
1122      if (serverName == null) {
1123        future
1124          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1125        return;
1126      }
1127      addListener(flush(serverName, location.getRegion(), columnFamily, writeFlushWALMarker),
1128        (ret, err2) -> {
1129          if (err2 != null) {
1130            future.completeExceptionally(err2);
1131          } else {
1132            future.complete(ret);
1133          }
1134        });
1135    });
1136    return future;
1137  }
1138
1139  private CompletableFuture<FlushRegionResponse> flush(ServerName serverName, RegionInfo regionInfo,
1140    byte[] columnFamily, boolean writeFlushWALMarker) {
1141    return this.<FlushRegionResponse> newAdminCaller().serverName(serverName)
1142      .action((controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse,
1143        FlushRegionResponse> adminCall(controller, stub,
1144          RequestConverter.buildFlushRegionRequest(regionInfo.getRegionName(), columnFamily,
1145            writeFlushWALMarker),
1146          (s, c, req, done) -> s.flushRegion(c, req, done), resp -> resp))
1147      .call();
1148  }
1149
1150  @Override
1151  public CompletableFuture<Void> flushRegionServer(ServerName sn) {
1152    CompletableFuture<Void> future = new CompletableFuture<>();
1153    addListener(getRegions(sn), (hRegionInfos, err) -> {
1154      if (err != null) {
1155        future.completeExceptionally(err);
1156        return;
1157      }
1158      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
1159      if (hRegionInfos != null) {
1160        hRegionInfos
1161          .forEach(region -> compactFutures.add(flush(sn, region, null, false).thenAccept(r -> {
1162          })));
1163      }
1164      addListener(CompletableFuture.allOf(
1165        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
1166          if (err2 != null) {
1167            future.completeExceptionally(err2);
1168          } else {
1169            future.complete(ret);
1170          }
1171        });
1172    });
1173    return future;
1174  }
1175
1176  @Override
1177  public CompletableFuture<Void> compact(TableName tableName, CompactType compactType) {
1178    return compact(tableName, null, false, compactType);
1179  }
1180
1181  @Override
1182  public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily,
1183    CompactType compactType) {
1184    Preconditions.checkNotNull(columnFamily, "columnFamily is null. "
1185      + "If you don't specify a columnFamily, use compact(TableName) instead");
1186    return compact(tableName, columnFamily, false, compactType);
1187  }
1188
1189  @Override
1190  public CompletableFuture<Void> compactRegion(byte[] regionName) {
1191    return compactRegion(regionName, null, false);
1192  }
1193
1194  @Override
1195  public CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily) {
1196    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1197      + " If you don't specify a columnFamily, use compactRegion(regionName) instead");
1198    return compactRegion(regionName, columnFamily, false);
1199  }
1200
1201  @Override
1202  public CompletableFuture<Void> majorCompact(TableName tableName, CompactType compactType) {
1203    return compact(tableName, null, true, compactType);
1204  }
1205
1206  @Override
1207  public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily,
1208    CompactType compactType) {
1209    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1210      + "If you don't specify a columnFamily, use compact(TableName) instead");
1211    return compact(tableName, columnFamily, true, compactType);
1212  }
1213
1214  @Override
1215  public CompletableFuture<Void> majorCompactRegion(byte[] regionName) {
1216    return compactRegion(regionName, null, true);
1217  }
1218
1219  @Override
1220  public CompletableFuture<Void> majorCompactRegion(byte[] regionName, byte[] columnFamily) {
1221    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1222      + " If you don't specify a columnFamily, use majorCompactRegion(regionName) instead");
1223    return compactRegion(regionName, columnFamily, true);
1224  }
1225
1226  @Override
1227  public CompletableFuture<Void> compactRegionServer(ServerName sn) {
1228    return compactRegionServer(sn, false);
1229  }
1230
1231  @Override
1232  public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) {
1233    return compactRegionServer(sn, true);
1234  }
1235
1236  private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) {
1237    CompletableFuture<Void> future = new CompletableFuture<>();
1238    addListener(getRegions(sn), (hRegionInfos, err) -> {
1239      if (err != null) {
1240        future.completeExceptionally(err);
1241        return;
1242      }
1243      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
1244      if (hRegionInfos != null) {
1245        hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, null)));
1246      }
1247      addListener(CompletableFuture.allOf(
1248        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
1249          if (err2 != null) {
1250            future.completeExceptionally(err2);
1251          } else {
1252            future.complete(ret);
1253          }
1254        });
1255    });
1256    return future;
1257  }
1258
1259  private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily,
1260    boolean major) {
1261    CompletableFuture<Void> future = new CompletableFuture<>();
1262    addListener(getRegionLocation(regionName), (location, err) -> {
1263      if (err != null) {
1264        future.completeExceptionally(err);
1265        return;
1266      }
1267      ServerName serverName = location.getServerName();
1268      if (serverName == null) {
1269        future
1270          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1271        return;
1272      }
1273      addListener(compact(location.getServerName(), location.getRegion(), major, columnFamily),
1274        (ret, err2) -> {
1275          if (err2 != null) {
1276            future.completeExceptionally(err2);
1277          } else {
1278            future.complete(ret);
1279          }
1280        });
1281    });
1282    return future;
1283  }
1284
1285  /**
1286   * List all region locations for the specific table.
1287   */
1288  private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) {
1289    if (TableName.META_TABLE_NAME.equals(tableName)) {
1290      CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
1291      addListener(connection.registry.getMetaRegionLocations(), (metaRegions, err) -> {
1292        if (err != null) {
1293          future.completeExceptionally(err);
1294        } else if (
1295          metaRegions == null || metaRegions.isEmpty()
1296            || metaRegions.getDefaultRegionLocation() == null
1297        ) {
1298          future.completeExceptionally(new IOException("meta region does not found"));
1299        } else {
1300          future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
1301        }
1302      });
1303      return future;
1304    } else {
1305      // For non-meta table, we fetch all locations by scanning hbase:meta table
1306      return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName);
1307    }
1308  }
1309
1310  /**
1311   * Compact column family of a table, Asynchronous operation even if CompletableFuture.get()
1312   */
1313  private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, boolean major,
1314    CompactType compactType) {
1315    CompletableFuture<Void> future = new CompletableFuture<>();
1316
1317    switch (compactType) {
1318      case MOB:
1319        addListener(connection.registry.getActiveMaster(), (serverName, err) -> {
1320          if (err != null) {
1321            future.completeExceptionally(err);
1322            return;
1323          }
1324          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
1325          addListener(compact(serverName, regionInfo, major, columnFamily), (ret, err2) -> {
1326            if (err2 != null) {
1327              future.completeExceptionally(err2);
1328            } else {
1329              future.complete(ret);
1330            }
1331          });
1332        });
1333        break;
1334      case NORMAL:
1335        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
1336          if (err != null) {
1337            future.completeExceptionally(err);
1338            return;
1339          }
1340          if (locations == null || locations.isEmpty()) {
1341            future.completeExceptionally(new TableNotFoundException(tableName));
1342            return;
1343          }
1344          CompletableFuture<?>[] compactFutures =
1345            locations.stream().filter(l -> l.getRegion() != null)
1346              .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
1347              .map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily))
1348              .toArray(CompletableFuture<?>[]::new);
1349          // future complete unless all of the compact futures are completed.
1350          addListener(CompletableFuture.allOf(compactFutures), (ret, err2) -> {
1351            if (err2 != null) {
1352              future.completeExceptionally(err2);
1353            } else {
1354              future.complete(ret);
1355            }
1356          });
1357        });
1358        break;
1359      default:
1360        throw new IllegalArgumentException("Unknown compactType: " + compactType);
1361    }
1362    return future;
1363  }
1364
1365  /**
1366   * Compact the region at specific region server.
1367   */
1368  private CompletableFuture<Void> compact(final ServerName sn, final RegionInfo hri,
1369    final boolean major, byte[] columnFamily) {
1370    return this.<Void> newAdminCaller().serverName(sn)
1371      .action((controller, stub) -> this.<CompactRegionRequest, CompactRegionResponse,
1372        Void> adminCall(controller, stub,
1373          RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, columnFamily),
1374          (s, c, req, done) -> s.compactRegion(c, req, done), resp -> null))
1375      .call();
1376  }
1377
1378  private byte[] toEncodeRegionName(byte[] regionName) {
1379    return RegionInfo.isEncodedRegionName(regionName)
1380      ? regionName
1381      : Bytes.toBytes(RegionInfo.encodeRegionName(regionName));
1382  }
1383
1384  private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName,
1385    CompletableFuture<TableName> result) {
1386    addListener(getRegionLocation(encodeRegionName), (location, err) -> {
1387      if (err != null) {
1388        result.completeExceptionally(err);
1389        return;
1390      }
1391      RegionInfo regionInfo = location.getRegion();
1392      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1393        result.completeExceptionally(
1394          new IllegalArgumentException("Can't invoke merge on non-default regions directly"));
1395        return;
1396      }
1397      if (!tableName.compareAndSet(null, regionInfo.getTable())) {
1398        if (!tableName.get().equals(regionInfo.getTable())) {
1399          // tables of this two region should be same.
1400          result.completeExceptionally(
1401            new IllegalArgumentException("Cannot merge regions from two different tables "
1402              + tableName.get() + " and " + regionInfo.getTable()));
1403        } else {
1404          result.complete(tableName.get());
1405        }
1406      }
1407    });
1408  }
1409
1410  private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[][] encodedRegionNames) {
1411    AtomicReference<TableName> tableNameRef = new AtomicReference<>();
1412    CompletableFuture<TableName> future = new CompletableFuture<>();
1413    for (byte[] encodedRegionName : encodedRegionNames) {
1414      checkAndGetTableName(encodedRegionName, tableNameRef, future);
1415    }
1416    return future;
1417  }
1418
1419  @Override
1420  public CompletableFuture<Boolean> mergeSwitch(boolean enabled, boolean drainMerges) {
1421    return setSplitOrMergeOn(enabled, drainMerges, MasterSwitchType.MERGE);
1422  }
1423
1424  @Override
1425  public CompletableFuture<Boolean> isMergeEnabled() {
1426    return isSplitOrMergeOn(MasterSwitchType.MERGE);
1427  }
1428
1429  @Override
1430  public CompletableFuture<Boolean> splitSwitch(boolean enabled, boolean drainSplits) {
1431    return setSplitOrMergeOn(enabled, drainSplits, MasterSwitchType.SPLIT);
1432  }
1433
1434  @Override
1435  public CompletableFuture<Boolean> isSplitEnabled() {
1436    return isSplitOrMergeOn(MasterSwitchType.SPLIT);
1437  }
1438
1439  private CompletableFuture<Boolean> setSplitOrMergeOn(boolean enabled, boolean synchronous,
1440    MasterSwitchType switchType) {
1441    SetSplitOrMergeEnabledRequest request =
1442      RequestConverter.buildSetSplitOrMergeEnabledRequest(enabled, synchronous, switchType);
1443    return this.<Boolean> newMasterCaller()
1444      .action((controller, stub) -> this.<SetSplitOrMergeEnabledRequest,
1445        SetSplitOrMergeEnabledResponse, Boolean> call(controller, stub, request,
1446          (s, c, req, done) -> s.setSplitOrMergeEnabled(c, req, done),
1447          (resp) -> resp.getPrevValueList().get(0)))
1448      .call();
1449  }
1450
1451  private CompletableFuture<Boolean> isSplitOrMergeOn(MasterSwitchType switchType) {
1452    IsSplitOrMergeEnabledRequest request =
1453      RequestConverter.buildIsSplitOrMergeEnabledRequest(switchType);
1454    return this.<Boolean> newMasterCaller()
1455      .action((controller, stub) -> this.<IsSplitOrMergeEnabledRequest,
1456        IsSplitOrMergeEnabledResponse, Boolean> call(controller, stub, request,
1457          (s, c, req, done) -> s.isSplitOrMergeEnabled(c, req, done), (resp) -> resp.getEnabled()))
1458      .call();
1459  }
1460
1461  @Override
1462  public CompletableFuture<Void> mergeRegions(List<byte[]> nameOfRegionsToMerge, boolean forcible) {
1463    if (nameOfRegionsToMerge.size() < 2) {
1464      return failedFuture(new IllegalArgumentException(
1465        "Can not merge only " + nameOfRegionsToMerge.size() + " region"));
1466    }
1467    CompletableFuture<Void> future = new CompletableFuture<>();
1468    byte[][] encodedNameOfRegionsToMerge =
1469      nameOfRegionsToMerge.stream().map(this::toEncodeRegionName).toArray(byte[][]::new);
1470
1471    addListener(checkRegionsAndGetTableName(encodedNameOfRegionsToMerge), (tableName, err) -> {
1472      if (err != null) {
1473        future.completeExceptionally(err);
1474        return;
1475      }
1476
1477      final MergeTableRegionsRequest request;
1478      try {
1479        request = RequestConverter.buildMergeTableRegionsRequest(encodedNameOfRegionsToMerge,
1480          forcible, ng.getNonceGroup(), ng.newNonce());
1481      } catch (DeserializationException e) {
1482        future.completeExceptionally(e);
1483        return;
1484      }
1485
1486      addListener(
1487        this.procedureCall(tableName, request, MasterService.Interface::mergeTableRegions,
1488          MergeTableRegionsResponse::getProcId, new MergeTableRegionProcedureBiConsumer(tableName)),
1489        (ret, err2) -> {
1490          if (err2 != null) {
1491            future.completeExceptionally(err2);
1492          } else {
1493            future.complete(ret);
1494          }
1495        });
1496    });
1497    return future;
1498  }
1499
1500  @Override
1501  public CompletableFuture<Void> split(TableName tableName) {
1502    CompletableFuture<Void> future = new CompletableFuture<>();
1503    addListener(tableExists(tableName), (exist, error) -> {
1504      if (error != null) {
1505        future.completeExceptionally(error);
1506        return;
1507      }
1508      if (!exist) {
1509        future.completeExceptionally(new TableNotFoundException(tableName));
1510        return;
1511      }
1512      addListener(metaTable
1513        .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)
1514          .withStartRow(ClientMetaTableAccessor.getTableStartRowForMeta(tableName,
1515            ClientMetaTableAccessor.QueryType.REGION))
1516          .withStopRow(ClientMetaTableAccessor.getTableStopRowForMeta(tableName,
1517            ClientMetaTableAccessor.QueryType.REGION))),
1518        (results, err2) -> {
1519          if (err2 != null) {
1520            future.completeExceptionally(err2);
1521            return;
1522          }
1523          if (results != null && !results.isEmpty()) {
1524            List<CompletableFuture<Void>> splitFutures = new ArrayList<>();
1525            for (Result r : results) {
1526              if (r.isEmpty() || CatalogFamilyFormat.getRegionInfo(r) == null) {
1527                continue;
1528              }
1529              RegionLocations rl = CatalogFamilyFormat.getRegionLocations(r);
1530              if (rl != null) {
1531                for (HRegionLocation h : rl.getRegionLocations()) {
1532                  if (h != null && h.getServerName() != null) {
1533                    RegionInfo hri = h.getRegion();
1534                    if (
1535                      hri == null || hri.isSplitParent()
1536                        || hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID
1537                    ) {
1538                      continue;
1539                    }
1540                    splitFutures.add(split(hri, null));
1541                  }
1542                }
1543              }
1544            }
1545            addListener(
1546              CompletableFuture
1547                .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()])),
1548              (ret, exception) -> {
1549                if (exception != null) {
1550                  future.completeExceptionally(exception);
1551                  return;
1552                }
1553                future.complete(ret);
1554              });
1555          } else {
1556            future.complete(null);
1557          }
1558        });
1559    });
1560    return future;
1561  }
1562
1563  @Override
1564  public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) {
1565    CompletableFuture<Void> result = new CompletableFuture<>();
1566    if (splitPoint == null) {
1567      return failedFuture(new IllegalArgumentException("splitPoint can not be null."));
1568    }
1569    addListener(connection.getRegionLocator(tableName).getRegionLocation(splitPoint, true),
1570      (loc, err) -> {
1571        if (err != null) {
1572          result.completeExceptionally(err);
1573        } else if (loc == null || loc.getRegion() == null) {
1574          result.completeExceptionally(new IllegalArgumentException(
1575            "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint)));
1576        } else {
1577          addListener(splitRegion(loc.getRegion().getRegionName(), splitPoint), (ret, err2) -> {
1578            if (err2 != null) {
1579              result.completeExceptionally(err2);
1580            } else {
1581              result.complete(ret);
1582            }
1583
1584          });
1585        }
1586      });
1587    return result;
1588  }
1589
1590  @Override
1591  public CompletableFuture<Void> splitRegion(byte[] regionName) {
1592    CompletableFuture<Void> future = new CompletableFuture<>();
1593    addListener(getRegionLocation(regionName), (location, err) -> {
1594      if (err != null) {
1595        future.completeExceptionally(err);
1596        return;
1597      }
1598      RegionInfo regionInfo = location.getRegion();
1599      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1600        future.completeExceptionally(new IllegalArgumentException("Can't split replicas directly. "
1601          + "Replicas are auto-split when their primary is split."));
1602        return;
1603      }
1604      ServerName serverName = location.getServerName();
1605      if (serverName == null) {
1606        future
1607          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1608        return;
1609      }
1610      addListener(split(regionInfo, null), (ret, err2) -> {
1611        if (err2 != null) {
1612          future.completeExceptionally(err2);
1613        } else {
1614          future.complete(ret);
1615        }
1616      });
1617    });
1618    return future;
1619  }
1620
1621  @Override
1622  public CompletableFuture<Void> splitRegion(byte[] regionName, byte[] splitPoint) {
1623    Preconditions.checkNotNull(splitPoint,
1624      "splitPoint is null. If you don't specify a splitPoint, use splitRegion(byte[]) instead");
1625    CompletableFuture<Void> future = new CompletableFuture<>();
1626    addListener(getRegionLocation(regionName), (location, err) -> {
1627      if (err != null) {
1628        future.completeExceptionally(err);
1629        return;
1630      }
1631      RegionInfo regionInfo = location.getRegion();
1632      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1633        future.completeExceptionally(new IllegalArgumentException("Can't split replicas directly. "
1634          + "Replicas are auto-split when their primary is split."));
1635        return;
1636      }
1637      ServerName serverName = location.getServerName();
1638      if (serverName == null) {
1639        future
1640          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1641        return;
1642      }
1643      if (
1644        regionInfo.getStartKey() != null
1645          && Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0
1646      ) {
1647        future.completeExceptionally(
1648          new IllegalArgumentException("should not give a splitkey which equals to startkey!"));
1649        return;
1650      }
1651      addListener(split(regionInfo, splitPoint), (ret, err2) -> {
1652        if (err2 != null) {
1653          future.completeExceptionally(err2);
1654        } else {
1655          future.complete(ret);
1656        }
1657      });
1658    });
1659    return future;
1660  }
1661
1662  private CompletableFuture<Void> split(final RegionInfo hri, byte[] splitPoint) {
1663    CompletableFuture<Void> future = new CompletableFuture<>();
1664    TableName tableName = hri.getTable();
1665    final SplitTableRegionRequest request;
1666    try {
1667      request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(),
1668        ng.newNonce());
1669    } catch (DeserializationException e) {
1670      future.completeExceptionally(e);
1671      return future;
1672    }
1673
1674    addListener(
1675      this.procedureCall(tableName, request, MasterService.Interface::splitRegion,
1676        SplitTableRegionResponse::getProcId, new SplitTableRegionProcedureBiConsumer(tableName)),
1677      (ret, err2) -> {
1678        if (err2 != null) {
1679          future.completeExceptionally(err2);
1680        } else {
1681          future.complete(ret);
1682        }
1683      });
1684    return future;
1685  }
1686
1687  @Override
1688  public CompletableFuture<Void> truncateRegion(byte[] regionName) {
1689    CompletableFuture<Void> future = new CompletableFuture<>();
1690    addListener(getRegionLocation(regionName), (location, err) -> {
1691      if (err != null) {
1692        future.completeExceptionally(err);
1693        return;
1694      }
1695      RegionInfo regionInfo = location.getRegion();
1696      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1697        future.completeExceptionally(new IllegalArgumentException(
1698          "Can't truncate replicas directly.Replicas are auto-truncated "
1699            + "when their primary is truncated."));
1700        return;
1701      }
1702      ServerName serverName = location.getServerName();
1703      if (serverName == null) {
1704        future
1705          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1706        return;
1707      }
1708      addListener(truncateRegion(regionInfo), (ret, err2) -> {
1709        if (err2 != null) {
1710          future.completeExceptionally(err2);
1711        } else {
1712          future.complete(ret);
1713        }
1714      });
1715    });
1716    return future;
1717  }
1718
1719  private CompletableFuture<Void> truncateRegion(final RegionInfo hri) {
1720    CompletableFuture<Void> future = new CompletableFuture<>();
1721    TableName tableName = hri.getTable();
1722    final MasterProtos.TruncateRegionRequest request;
1723    try {
1724      request = RequestConverter.buildTruncateRegionRequest(hri, ng.getNonceGroup(), ng.newNonce());
1725    } catch (DeserializationException e) {
1726      future.completeExceptionally(e);
1727      return future;
1728    }
1729    addListener(this.procedureCall(tableName, request, MasterService.Interface::truncateRegion,
1730      MasterProtos.TruncateRegionResponse::getProcId,
1731      new TruncateRegionProcedureBiConsumer(tableName)), (ret, err2) -> {
1732        if (err2 != null) {
1733          future.completeExceptionally(err2);
1734        } else {
1735          future.complete(ret);
1736        }
1737      });
1738    return future;
1739  }
1740
1741  @Override
1742  public CompletableFuture<Void> assign(byte[] regionName) {
1743    CompletableFuture<Void> future = new CompletableFuture<>();
1744    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1745      if (err != null) {
1746        future.completeExceptionally(err);
1747        return;
1748      }
1749      addListener(
1750        this.<Void> newMasterCaller().priority(regionInfo.getTable())
1751          .action((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call(
1752            controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()),
1753            (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null))
1754          .call(),
1755        (ret, err2) -> {
1756          if (err2 != null) {
1757            future.completeExceptionally(err2);
1758          } else {
1759            future.complete(ret);
1760          }
1761        });
1762    });
1763    return future;
1764  }
1765
1766  @Override
1767  public CompletableFuture<Void> unassign(byte[] regionName) {
1768    CompletableFuture<Void> future = new CompletableFuture<>();
1769    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1770      if (err != null) {
1771        future.completeExceptionally(err);
1772        return;
1773      }
1774      addListener(
1775        this.<Void> newMasterCaller().priority(regionInfo.getTable())
1776          .action((controller, stub) -> this.<UnassignRegionRequest, UnassignRegionResponse,
1777            Void> call(controller, stub,
1778              RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName()),
1779              (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null))
1780          .call(),
1781        (ret, err2) -> {
1782          if (err2 != null) {
1783            future.completeExceptionally(err2);
1784          } else {
1785            future.complete(ret);
1786          }
1787        });
1788    });
1789    return future;
1790  }
1791
1792  @Override
1793  public CompletableFuture<Void> offline(byte[] regionName) {
1794    CompletableFuture<Void> future = new CompletableFuture<>();
1795    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1796      if (err != null) {
1797        future.completeExceptionally(err);
1798        return;
1799      }
1800      addListener(this.<Void> newMasterCaller().priority(regionInfo.getTable())
1801        .action((controller, stub) -> this.<OfflineRegionRequest, OfflineRegionResponse, Void> call(
1802          controller, stub, RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()),
1803          (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null))
1804        .call(), (ret, err2) -> {
1805          if (err2 != null) {
1806            future.completeExceptionally(err2);
1807          } else {
1808            future.complete(ret);
1809          }
1810        });
1811    });
1812    return future;
1813  }
1814
1815  @Override
1816  public CompletableFuture<Void> move(byte[] regionName) {
1817    CompletableFuture<Void> future = new CompletableFuture<>();
1818    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1819      if (err != null) {
1820        future.completeExceptionally(err);
1821        return;
1822      }
1823      addListener(
1824        moveRegion(regionInfo,
1825          RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)),
1826        (ret, err2) -> {
1827          if (err2 != null) {
1828            future.completeExceptionally(err2);
1829          } else {
1830            future.complete(ret);
1831          }
1832        });
1833    });
1834    return future;
1835  }
1836
1837  @Override
1838  public CompletableFuture<Void> move(byte[] regionName, ServerName destServerName) {
1839    Preconditions.checkNotNull(destServerName,
1840      "destServerName is null. If you don't specify a destServerName, use move(byte[]) instead");
1841    CompletableFuture<Void> future = new CompletableFuture<>();
1842    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1843      if (err != null) {
1844        future.completeExceptionally(err);
1845        return;
1846      }
1847      addListener(
1848        moveRegion(regionInfo, RequestConverter
1849          .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)),
1850        (ret, err2) -> {
1851          if (err2 != null) {
1852            future.completeExceptionally(err2);
1853          } else {
1854            future.complete(ret);
1855          }
1856        });
1857    });
1858    return future;
1859  }
1860
1861  private CompletableFuture<Void> moveRegion(RegionInfo regionInfo, MoveRegionRequest request) {
1862    return this.<Void> newMasterCaller().priority(regionInfo.getTable())
1863      .action(
1864        (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(controller,
1865          stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null))
1866      .call();
1867  }
1868
1869  @Override
1870  public CompletableFuture<Void> setQuota(QuotaSettings quota) {
1871    return this.<Void> newMasterCaller()
1872      .action((controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(controller,
1873        stub, QuotaSettings.buildSetQuotaRequestProto(quota),
1874        (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null))
1875      .call();
1876  }
1877
1878  @Override
1879  public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) {
1880    CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>();
1881    Scan scan = QuotaTableUtil.makeScan(filter);
1882    this.connection.getTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build().scan(scan,
1883      new AdvancedScanResultConsumer() {
1884        List<QuotaSettings> settings = new ArrayList<>();
1885
1886        @Override
1887        public void onNext(Result[] results, ScanController controller) {
1888          for (Result result : results) {
1889            try {
1890              QuotaTableUtil.parseResultToCollection(result, settings);
1891            } catch (IOException e) {
1892              controller.terminate();
1893              future.completeExceptionally(e);
1894            }
1895          }
1896        }
1897
1898        @Override
1899        public void onError(Throwable error) {
1900          future.completeExceptionally(error);
1901        }
1902
1903        @Override
1904        public void onComplete() {
1905          future.complete(settings);
1906        }
1907      });
1908    return future;
1909  }
1910
1911  @Override
1912  public CompletableFuture<Void> addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig,
1913    boolean enabled) {
1914    return this.<AddReplicationPeerRequest, AddReplicationPeerResponse> procedureCall(
1915      RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled),
1916      (s, c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1917      new ReplicationProcedureBiConsumer(peerId, () -> "ADD_REPLICATION_PEER"));
1918  }
1919
1920  @Override
1921  public CompletableFuture<Void> removeReplicationPeer(String peerId) {
1922    return this.<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse> procedureCall(
1923      RequestConverter.buildRemoveReplicationPeerRequest(peerId),
1924      (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1925      new ReplicationProcedureBiConsumer(peerId, () -> "REMOVE_REPLICATION_PEER"));
1926  }
1927
1928  @Override
1929  public CompletableFuture<Void> enableReplicationPeer(String peerId) {
1930    return this.<EnableReplicationPeerRequest, EnableReplicationPeerResponse> procedureCall(
1931      RequestConverter.buildEnableReplicationPeerRequest(peerId),
1932      (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1933      new ReplicationProcedureBiConsumer(peerId, () -> "ENABLE_REPLICATION_PEER"));
1934  }
1935
1936  @Override
1937  public CompletableFuture<Void> disableReplicationPeer(String peerId) {
1938    return this.<DisableReplicationPeerRequest, DisableReplicationPeerResponse> procedureCall(
1939      RequestConverter.buildDisableReplicationPeerRequest(peerId),
1940      (s, c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1941      new ReplicationProcedureBiConsumer(peerId, () -> "DISABLE_REPLICATION_PEER"));
1942  }
1943
1944  @Override
1945  public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) {
1946    return this.<ReplicationPeerConfig> newMasterCaller()
1947      .action((controller, stub) -> this.<GetReplicationPeerConfigRequest,
1948        GetReplicationPeerConfigResponse, ReplicationPeerConfig> call(controller, stub,
1949          RequestConverter.buildGetReplicationPeerConfigRequest(peerId),
1950          (s, c, req, done) -> s.getReplicationPeerConfig(c, req, done),
1951          (resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig())))
1952      .call();
1953  }
1954
1955  @Override
1956  public CompletableFuture<Void> updateReplicationPeerConfig(String peerId,
1957    ReplicationPeerConfig peerConfig) {
1958    return this.<UpdateReplicationPeerConfigRequest,
1959      UpdateReplicationPeerConfigResponse> procedureCall(
1960        RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig),
1961        (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done),
1962        (resp) -> resp.getProcId(),
1963        new ReplicationProcedureBiConsumer(peerId, () -> "UPDATE_REPLICATION_PEER_CONFIG"));
1964  }
1965
1966  @Override
1967  public CompletableFuture<Void> transitReplicationPeerSyncReplicationState(String peerId,
1968    SyncReplicationState clusterState) {
1969    return this.<TransitReplicationPeerSyncReplicationStateRequest,
1970      TransitReplicationPeerSyncReplicationStateResponse> procedureCall(
1971        RequestConverter.buildTransitReplicationPeerSyncReplicationStateRequest(peerId,
1972          clusterState),
1973        (s, c, req, done) -> s.transitReplicationPeerSyncReplicationState(c, req, done),
1974        (resp) -> resp.getProcId(), new ReplicationProcedureBiConsumer(peerId,
1975          () -> "TRANSIT_REPLICATION_PEER_SYNCHRONOUS_REPLICATION_STATE"));
1976  }
1977
1978  @Override
1979  public CompletableFuture<Void> appendReplicationPeerTableCFs(String id,
1980    Map<TableName, List<String>> tableCfs) {
1981    if (tableCfs == null) {
1982      return failedFuture(new ReplicationException("tableCfs is null"));
1983    }
1984
1985    CompletableFuture<Void> future = new CompletableFuture<>();
1986    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
1987      if (!completeExceptionally(future, error)) {
1988        ReplicationPeerConfig newPeerConfig =
1989          ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
1990        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
1991          if (!completeExceptionally(future, error)) {
1992            future.complete(result);
1993          }
1994        });
1995      }
1996    });
1997    return future;
1998  }
1999
2000  @Override
2001  public CompletableFuture<Void> removeReplicationPeerTableCFs(String id,
2002    Map<TableName, List<String>> tableCfs) {
2003    if (tableCfs == null) {
2004      return failedFuture(new ReplicationException("tableCfs is null"));
2005    }
2006
2007    CompletableFuture<Void> future = new CompletableFuture<>();
2008    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
2009      if (!completeExceptionally(future, error)) {
2010        ReplicationPeerConfig newPeerConfig = null;
2011        try {
2012          newPeerConfig = ReplicationPeerConfigUtil
2013            .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
2014        } catch (ReplicationException e) {
2015          future.completeExceptionally(e);
2016          return;
2017        }
2018        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
2019          if (!completeExceptionally(future, error)) {
2020            future.complete(result);
2021          }
2022        });
2023      }
2024    });
2025    return future;
2026  }
2027
2028  @Override
2029  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers() {
2030    return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(null));
2031  }
2032
2033  @Override
2034  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Pattern pattern) {
2035    Preconditions.checkNotNull(pattern,
2036      "pattern is null. If you don't specify a pattern, use listReplicationPeers() instead");
2037    return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(pattern));
2038  }
2039
2040  private CompletableFuture<List<ReplicationPeerDescription>>
2041    listReplicationPeers(ListReplicationPeersRequest request) {
2042    return this.<List<ReplicationPeerDescription>> newMasterCaller()
2043      .action((controller, stub) -> this.<ListReplicationPeersRequest, ListReplicationPeersResponse,
2044        List<ReplicationPeerDescription>> call(controller, stub, request,
2045          (s, c, req, done) -> s.listReplicationPeers(c, req, done),
2046          (resp) -> resp.getPeerDescList().stream()
2047            .map(ReplicationPeerConfigUtil::toReplicationPeerDescription)
2048            .collect(Collectors.toList())))
2049      .call();
2050  }
2051
2052  @Override
2053  public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() {
2054    CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>();
2055    addListener(listTableDescriptors(), (tables, error) -> {
2056      if (!completeExceptionally(future, error)) {
2057        List<TableCFs> replicatedTableCFs = new ArrayList<>();
2058        tables.forEach(table -> {
2059          Map<String, Integer> cfs = new HashMap<>();
2060          Stream.of(table.getColumnFamilies())
2061            .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL)
2062            .forEach(column -> {
2063              cfs.put(column.getNameAsString(), column.getScope());
2064            });
2065          if (!cfs.isEmpty()) {
2066            replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs));
2067          }
2068        });
2069        future.complete(replicatedTableCFs);
2070      }
2071    });
2072    return future;
2073  }
2074
2075  @Override
2076  public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) {
2077    SnapshotProtos.SnapshotDescription snapshot =
2078      ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc);
2079    try {
2080      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
2081    } catch (IllegalArgumentException e) {
2082      return failedFuture(e);
2083    }
2084    CompletableFuture<Void> future = new CompletableFuture<>();
2085    final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot)
2086      .setNonceGroup(ng.getNonceGroup()).setNonce(ng.newNonce()).build();
2087    addListener(this.<SnapshotResponse> newMasterCaller()
2088      .action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, SnapshotResponse> call(
2089        controller, stub, request, (s, c, req, done) -> s.snapshot(c, req, done), resp -> resp))
2090      .call(), (resp, err) -> {
2091        if (err != null) {
2092          future.completeExceptionally(err);
2093          return;
2094        }
2095        waitSnapshotFinish(snapshotDesc, future, resp);
2096      });
2097    return future;
2098  }
2099
2100  // This is for keeping compatibility with old implementation.
2101  // If there is a procId field in the response, then the snapshot will be operated with a
2102  // SnapshotProcedure, otherwise the snapshot will be coordinated by zk.
2103  private void waitSnapshotFinish(SnapshotDescription snapshot, CompletableFuture<Void> future,
2104    SnapshotResponse resp) {
2105    if (resp.hasProcId()) {
2106      getProcedureResult(resp.getProcId(), src -> null, future, 0);
2107      addListener(future, new SnapshotProcedureBiConsumer(snapshot.getTableName()));
2108    } else {
2109      long expectedTimeout = resp.getExpectedTimeout();
2110      TimerTask pollingTask = new TimerTask() {
2111        int tries = 0;
2112        long startTime = EnvironmentEdgeManager.currentTime();
2113        long endTime = startTime + expectedTimeout;
2114        long maxPauseTime = expectedTimeout / maxAttempts;
2115
2116        @Override
2117        public void run(Timeout timeout) throws Exception {
2118          if (EnvironmentEdgeManager.currentTime() < endTime) {
2119            addListener(isSnapshotFinished(snapshot), (done, err2) -> {
2120              if (err2 != null) {
2121                future.completeExceptionally(err2);
2122              } else if (done) {
2123                future.complete(null);
2124              } else {
2125                // retry again after pauseTime.
2126                long pauseTime =
2127                  ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
2128                pauseTime = Math.min(pauseTime, maxPauseTime);
2129                AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, TimeUnit.MILLISECONDS);
2130              }
2131            });
2132          } else {
2133            future
2134              .completeExceptionally(new SnapshotCreationException("Snapshot '" + snapshot.getName()
2135                + "' wasn't completed in expectedTime:" + expectedTimeout + " ms", snapshot));
2136          }
2137        }
2138      };
2139      AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
2140    }
2141  }
2142
2143  @Override
2144  public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) {
2145    return this.<Boolean> newMasterCaller()
2146      .action((controller, stub) -> this.<IsSnapshotDoneRequest, IsSnapshotDoneResponse,
2147        Boolean> call(controller, stub,
2148          IsSnapshotDoneRequest.newBuilder()
2149            .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(),
2150          (s, c, req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone()))
2151      .call();
2152  }
2153
2154  @Override
2155  public CompletableFuture<Void> restoreSnapshot(String snapshotName) {
2156    boolean takeFailSafeSnapshot = this.connection.getConfiguration().getBoolean(
2157      HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT,
2158      HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT);
2159    return restoreSnapshot(snapshotName, takeFailSafeSnapshot);
2160  }
2161
2162  @Override
2163  public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot,
2164    boolean restoreAcl) {
2165    CompletableFuture<Void> future = new CompletableFuture<>();
2166    addListener(listSnapshots(Pattern.compile(snapshotName)), (snapshotDescriptions, err) -> {
2167      if (err != null) {
2168        future.completeExceptionally(err);
2169        return;
2170      }
2171      TableName tableName = null;
2172      if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) {
2173        for (SnapshotDescription snap : snapshotDescriptions) {
2174          if (snap.getName().equals(snapshotName)) {
2175            tableName = snap.getTableName();
2176            break;
2177          }
2178        }
2179      }
2180      if (tableName == null) {
2181        future.completeExceptionally(
2182          new RestoreSnapshotException("The snapshot " + snapshotName + " does not exist."));
2183        return;
2184      }
2185      final TableName finalTableName = tableName;
2186      addListener(tableExists(finalTableName), (exists, err2) -> {
2187        if (err2 != null) {
2188          future.completeExceptionally(err2);
2189        } else if (!exists) {
2190          // if table does not exist, then just clone snapshot into new table.
2191          completeConditionalOnFuture(future,
2192            internalRestoreSnapshot(snapshotName, finalTableName, restoreAcl, null));
2193        } else {
2194          addListener(isTableDisabled(finalTableName), (disabled, err4) -> {
2195            if (err4 != null) {
2196              future.completeExceptionally(err4);
2197            } else if (!disabled) {
2198              future.completeExceptionally(new TableNotDisabledException(finalTableName));
2199            } else {
2200              completeConditionalOnFuture(future,
2201                restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot, restoreAcl));
2202            }
2203          });
2204        }
2205      });
2206    });
2207    return future;
2208  }
2209
2210  private CompletableFuture<Void> restoreSnapshot(String snapshotName, TableName tableName,
2211    boolean takeFailSafeSnapshot, boolean restoreAcl) {
2212    if (takeFailSafeSnapshot) {
2213      CompletableFuture<Void> future = new CompletableFuture<>();
2214      // Step.1 Take a snapshot of the current state
2215      String failSafeSnapshotSnapshotNameFormat =
2216        this.connection.getConfiguration().get(HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME,
2217          HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME);
2218      final String failSafeSnapshotSnapshotName =
2219        failSafeSnapshotSnapshotNameFormat.replace("{snapshot.name}", snapshotName)
2220          .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.'))
2221          .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime()));
2222      LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2223      addListener(snapshot(failSafeSnapshotSnapshotName, tableName), (ret, err) -> {
2224        if (err != null) {
2225          future.completeExceptionally(err);
2226        } else {
2227          // Step.2 Restore snapshot
2228          addListener(internalRestoreSnapshot(snapshotName, tableName, restoreAcl, null),
2229            (void2, err2) -> {
2230              if (err2 != null) {
2231                // Step.3.a Something went wrong during the restore and try to rollback.
2232                addListener(internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName,
2233                  restoreAcl, null), (void3, err3) -> {
2234                    if (err3 != null) {
2235                      future.completeExceptionally(err3);
2236                    } else {
2237                      // If fail to restore snapshot but rollback successfully, delete the
2238                      // restore-failsafe snapshot.
2239                      LOG.info(
2240                        "Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2241                      addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret4, err4) -> {
2242                        if (err4 != null) {
2243                          LOG.error("Unable to remove the failsafe snapshot: {}",
2244                            failSafeSnapshotSnapshotName, err4);
2245                        }
2246                        String msg =
2247                          "Restore snapshot=" + snapshotName + " failed, Rollback to snapshot="
2248                            + failSafeSnapshotSnapshotName + " succeeded.";
2249                        LOG.error(msg);
2250                        future.completeExceptionally(new RestoreSnapshotException(msg, err2));
2251                      });
2252                    }
2253                  });
2254              } else {
2255                // Step.3.b If the restore is succeeded, delete the pre-restore snapshot.
2256                LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2257                addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret3, err3) -> {
2258                  if (err3 != null) {
2259                    LOG.error(
2260                      "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName,
2261                      err3);
2262                  }
2263                  future.complete(ret3);
2264                });
2265              }
2266            });
2267        }
2268      });
2269      return future;
2270    } else {
2271      return internalRestoreSnapshot(snapshotName, tableName, restoreAcl, null);
2272    }
2273  }
2274
2275  private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture,
2276    CompletableFuture<T> parentFuture) {
2277    addListener(parentFuture, (res, err) -> {
2278      if (err != null) {
2279        dependentFuture.completeExceptionally(err);
2280      } else {
2281        dependentFuture.complete(res);
2282      }
2283    });
2284  }
2285
2286  @Override
2287  public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName,
2288    boolean restoreAcl, String customSFT) {
2289    CompletableFuture<Void> future = new CompletableFuture<>();
2290    addListener(tableExists(tableName), (exists, err) -> {
2291      if (err != null) {
2292        future.completeExceptionally(err);
2293      } else if (exists) {
2294        future.completeExceptionally(new TableExistsException(tableName));
2295      } else {
2296        completeConditionalOnFuture(future,
2297          internalRestoreSnapshot(snapshotName, tableName, restoreAcl, customSFT));
2298      }
2299    });
2300    return future;
2301  }
2302
2303  private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, TableName tableName,
2304    boolean restoreAcl, String customSFT) {
2305    SnapshotProtos.SnapshotDescription snapshot = SnapshotProtos.SnapshotDescription.newBuilder()
2306      .setName(snapshotName).setTable(tableName.getNameAsString()).build();
2307    try {
2308      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
2309    } catch (IllegalArgumentException e) {
2310      return failedFuture(e);
2311    }
2312    RestoreSnapshotRequest.Builder builder =
2313      RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot).setNonceGroup(ng.getNonceGroup())
2314        .setNonce(ng.newNonce()).setRestoreACL(restoreAcl);
2315    if (customSFT != null) {
2316      builder.setCustomSFT(customSFT);
2317    }
2318    return waitProcedureResult(this.<Long> newMasterCaller()
2319      .action((controller, stub) -> this.<RestoreSnapshotRequest, RestoreSnapshotResponse,
2320        Long> call(controller, stub, builder.build(),
2321          (s, c, req, done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId()))
2322      .call(), result -> null);
2323  }
2324
2325  @Override
2326  public CompletableFuture<List<SnapshotDescription>> listSnapshots() {
2327    return getCompletedSnapshots(null);
2328  }
2329
2330  @Override
2331  public CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern) {
2332    Preconditions.checkNotNull(pattern,
2333      "pattern is null. If you don't specify a pattern, use listSnapshots() instead");
2334    return getCompletedSnapshots(pattern);
2335  }
2336
2337  private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(Pattern pattern) {
2338    return this.<List<SnapshotDescription>> newMasterCaller()
2339      .action((controller, stub) -> this.<GetCompletedSnapshotsRequest,
2340        GetCompletedSnapshotsResponse, List<SnapshotDescription>> call(controller, stub,
2341          GetCompletedSnapshotsRequest.newBuilder().build(),
2342          (s, c, req, done) -> s.getCompletedSnapshots(c, req, done),
2343          resp -> ProtobufUtil.toSnapshotDescriptionList(resp, pattern)))
2344      .call();
2345  }
2346
2347  @Override
2348  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern) {
2349    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2350      + " If you don't specify a tableNamePattern, use listSnapshots() instead");
2351    return getCompletedSnapshots(tableNamePattern, null);
2352  }
2353
2354  @Override
2355  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern,
2356    Pattern snapshotNamePattern) {
2357    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2358      + " If you don't specify a tableNamePattern, use listSnapshots(Pattern) instead");
2359    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2360      + " If you don't specify a snapshotNamePattern, use listTableSnapshots(Pattern) instead");
2361    return getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2362  }
2363
2364  private CompletableFuture<List<SnapshotDescription>>
2365    getCompletedSnapshots(Pattern tableNamePattern, Pattern snapshotNamePattern) {
2366    CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>();
2367    addListener(listTableNames(tableNamePattern, false), (tableNames, err) -> {
2368      if (err != null) {
2369        future.completeExceptionally(err);
2370        return;
2371      }
2372      if (tableNames == null || tableNames.size() <= 0) {
2373        future.complete(Collections.emptyList());
2374        return;
2375      }
2376      addListener(getCompletedSnapshots(snapshotNamePattern), (snapshotDescList, err2) -> {
2377        if (err2 != null) {
2378          future.completeExceptionally(err2);
2379          return;
2380        }
2381        if (snapshotDescList == null || snapshotDescList.isEmpty()) {
2382          future.complete(Collections.emptyList());
2383          return;
2384        }
2385        future.complete(snapshotDescList.stream()
2386          .filter(snap -> (snap != null && tableNames.contains(snap.getTableName())))
2387          .collect(Collectors.toList()));
2388      });
2389    });
2390    return future;
2391  }
2392
2393  @Override
2394  public CompletableFuture<Void> deleteSnapshot(String snapshotName) {
2395    return internalDeleteSnapshot(new SnapshotDescription(snapshotName));
2396  }
2397
2398  @Override
2399  public CompletableFuture<Void> deleteSnapshots() {
2400    return internalDeleteSnapshots(null, null);
2401  }
2402
2403  @Override
2404  public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) {
2405    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2406      + " If you don't specify a snapshotNamePattern, use deleteSnapshots() instead");
2407    return internalDeleteSnapshots(null, snapshotNamePattern);
2408  }
2409
2410  @Override
2411  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern) {
2412    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2413      + " If you don't specify a tableNamePattern, use deleteSnapshots() instead");
2414    return internalDeleteSnapshots(tableNamePattern, null);
2415  }
2416
2417  @Override
2418  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern,
2419    Pattern snapshotNamePattern) {
2420    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2421      + " If you don't specify a tableNamePattern, use deleteSnapshots(Pattern) instead");
2422    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2423      + " If you don't specify a snapshotNamePattern, use deleteSnapshots(Pattern) instead");
2424    return internalDeleteSnapshots(tableNamePattern, snapshotNamePattern);
2425  }
2426
2427  private CompletableFuture<Void> internalDeleteSnapshots(Pattern tableNamePattern,
2428    Pattern snapshotNamePattern) {
2429    CompletableFuture<List<SnapshotDescription>> listSnapshotsFuture;
2430    if (tableNamePattern == null) {
2431      listSnapshotsFuture = getCompletedSnapshots(snapshotNamePattern);
2432    } else {
2433      listSnapshotsFuture = getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2434    }
2435    CompletableFuture<Void> future = new CompletableFuture<>();
2436    addListener(listSnapshotsFuture, (snapshotDescriptions, err) -> {
2437      if (err != null) {
2438        future.completeExceptionally(err);
2439        return;
2440      }
2441      if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) {
2442        future.complete(null);
2443        return;
2444      }
2445      addListener(CompletableFuture.allOf(snapshotDescriptions.stream()
2446        .map(this::internalDeleteSnapshot).toArray(CompletableFuture[]::new)), (v, e) -> {
2447          if (e != null) {
2448            future.completeExceptionally(e);
2449          } else {
2450            future.complete(v);
2451          }
2452        });
2453    });
2454    return future;
2455  }
2456
2457  private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) {
2458    return this.<Void> newMasterCaller()
2459      .action((controller, stub) -> this.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call(
2460        controller, stub,
2461        DeleteSnapshotRequest.newBuilder()
2462          .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(),
2463        (s, c, req, done) -> s.deleteSnapshot(c, req, done), resp -> null))
2464      .call();
2465  }
2466
2467  @Override
2468  public CompletableFuture<Void> execProcedure(String signature, String instance,
2469    Map<String, String> props) {
2470    CompletableFuture<Void> future = new CompletableFuture<>();
2471    ProcedureDescription procDesc =
2472      ProtobufUtil.buildProcedureDescription(signature, instance, props);
2473    addListener(this.<Long> newMasterCaller()
2474      .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call(
2475        controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(),
2476        (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout()))
2477      .call(), (expectedTimeout, err) -> {
2478        if (err != null) {
2479          future.completeExceptionally(err);
2480          return;
2481        }
2482        TimerTask pollingTask = new TimerTask() {
2483          int tries = 0;
2484          long startTime = EnvironmentEdgeManager.currentTime();
2485          long endTime = startTime + expectedTimeout;
2486          long maxPauseTime = expectedTimeout / maxAttempts;
2487
2488          @Override
2489          public void run(Timeout timeout) throws Exception {
2490            if (EnvironmentEdgeManager.currentTime() < endTime) {
2491              addListener(isProcedureFinished(signature, instance, props), (done, err2) -> {
2492                if (err2 != null) {
2493                  future.completeExceptionally(err2);
2494                  return;
2495                }
2496                if (done) {
2497                  future.complete(null);
2498                } else {
2499                  // retry again after pauseTime.
2500                  long pauseTime =
2501                    ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
2502                  pauseTime = Math.min(pauseTime, maxPauseTime);
2503                  AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
2504                    TimeUnit.MICROSECONDS);
2505                }
2506              });
2507            } else {
2508              future.completeExceptionally(new IOException("Procedure '" + signature + " : "
2509                + instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms"));
2510            }
2511          }
2512        };
2513        // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously.
2514        AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
2515      });
2516    return future;
2517  }
2518
2519  @Override
2520  public CompletableFuture<byte[]> execProcedureWithReturn(String signature, String instance,
2521    Map<String, String> props) {
2522    ProcedureDescription proDesc =
2523      ProtobufUtil.buildProcedureDescription(signature, instance, props);
2524    return this.<byte[]> newMasterCaller()
2525      .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, byte[]> call(
2526        controller, stub, ExecProcedureRequest.newBuilder().setProcedure(proDesc).build(),
2527        (s, c, req, done) -> s.execProcedureWithRet(c, req, done),
2528        resp -> resp.hasReturnData() ? resp.getReturnData().toByteArray() : null))
2529      .call();
2530  }
2531
2532  @Override
2533  public CompletableFuture<Boolean> isProcedureFinished(String signature, String instance,
2534    Map<String, String> props) {
2535    ProcedureDescription proDesc =
2536      ProtobufUtil.buildProcedureDescription(signature, instance, props);
2537    return this.<Boolean> newMasterCaller()
2538      .action(
2539        (controller, stub) -> this.<IsProcedureDoneRequest, IsProcedureDoneResponse, Boolean> call(
2540          controller, stub, IsProcedureDoneRequest.newBuilder().setProcedure(proDesc).build(),
2541          (s, c, req, done) -> s.isProcedureDone(c, req, done), resp -> resp.getDone()))
2542      .call();
2543  }
2544
2545  @Override
2546  public CompletableFuture<Boolean> abortProcedure(long procId, boolean mayInterruptIfRunning) {
2547    return this.<Boolean> newMasterCaller().action(
2548      (controller, stub) -> this.<AbortProcedureRequest, AbortProcedureResponse, Boolean> call(
2549        controller, stub, AbortProcedureRequest.newBuilder().setProcId(procId).build(),
2550        (s, c, req, done) -> s.abortProcedure(c, req, done), resp -> resp.getIsProcedureAborted()))
2551      .call();
2552  }
2553
2554  @Override
2555  public CompletableFuture<String> getProcedures() {
2556    return this.<String> newMasterCaller()
2557      .action((controller, stub) -> this.<GetProceduresRequest, GetProceduresResponse, String> call(
2558        controller, stub, GetProceduresRequest.newBuilder().build(),
2559        (s, c, req, done) -> s.getProcedures(c, req, done),
2560        resp -> ProtobufUtil.toProcedureJson(resp.getProcedureList())))
2561      .call();
2562  }
2563
2564  @Override
2565  public CompletableFuture<String> getLocks() {
2566    return this.<String> newMasterCaller()
2567      .action(
2568        (controller, stub) -> this.<GetLocksRequest, GetLocksResponse, String> call(controller,
2569          stub, GetLocksRequest.newBuilder().build(), (s, c, req, done) -> s.getLocks(c, req, done),
2570          resp -> ProtobufUtil.toLockJson(resp.getLockList())))
2571      .call();
2572  }
2573
2574  @Override
2575  public CompletableFuture<Void> decommissionRegionServers(List<ServerName> servers,
2576    boolean offload) {
2577    return this.<Void> newMasterCaller()
2578      .action((controller, stub) -> this.<DecommissionRegionServersRequest,
2579        DecommissionRegionServersResponse, Void> call(controller, stub,
2580          RequestConverter.buildDecommissionRegionServersRequest(servers, offload),
2581          (s, c, req, done) -> s.decommissionRegionServers(c, req, done), resp -> null))
2582      .call();
2583  }
2584
2585  @Override
2586  public CompletableFuture<List<ServerName>> listDecommissionedRegionServers() {
2587    return this.<List<ServerName>> newMasterCaller()
2588      .action((controller, stub) -> this.<ListDecommissionedRegionServersRequest,
2589        ListDecommissionedRegionServersResponse, List<ServerName>> call(controller, stub,
2590          ListDecommissionedRegionServersRequest.newBuilder().build(),
2591          (s, c, req, done) -> s.listDecommissionedRegionServers(c, req, done),
2592          resp -> resp.getServerNameList().stream().map(ProtobufUtil::toServerName)
2593            .collect(Collectors.toList())))
2594      .call();
2595  }
2596
2597  @Override
2598  public CompletableFuture<Void> recommissionRegionServer(ServerName server,
2599    List<byte[]> encodedRegionNames) {
2600    return this.<Void> newMasterCaller()
2601      .action((controller, stub) -> this.<RecommissionRegionServerRequest,
2602        RecommissionRegionServerResponse, Void> call(controller, stub,
2603          RequestConverter.buildRecommissionRegionServerRequest(server, encodedRegionNames),
2604          (s, c, req, done) -> s.recommissionRegionServer(c, req, done), resp -> null))
2605      .call();
2606  }
2607
2608  /**
2609   * Get the region location for the passed region name. The region name may be a full region name
2610   * or encoded region name. If the region does not found, then it'll throw an
2611   * UnknownRegionException wrapped by a {@link CompletableFuture}
2612   * @param regionNameOrEncodedRegionName region name or encoded region name
2613   * @return region location, wrapped by a {@link CompletableFuture}
2614   */
2615  CompletableFuture<HRegionLocation> getRegionLocation(byte[] regionNameOrEncodedRegionName) {
2616    if (regionNameOrEncodedRegionName == null) {
2617      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2618    }
2619
2620    CompletableFuture<Optional<HRegionLocation>> future;
2621    if (RegionInfo.isEncodedRegionName(regionNameOrEncodedRegionName)) {
2622      String encodedName = Bytes.toString(regionNameOrEncodedRegionName);
2623      if (encodedName.length() < RegionInfo.MD5_HEX_LENGTH) {
2624        // old format encodedName, should be meta region
2625        future = connection.registry.getMetaRegionLocations()
2626          .thenApply(locs -> Stream.of(locs.getRegionLocations())
2627            .filter(loc -> loc.getRegion().getEncodedName().equals(encodedName)).findFirst());
2628      } else {
2629        future = ClientMetaTableAccessor.getRegionLocationWithEncodedName(metaTable,
2630          regionNameOrEncodedRegionName);
2631      }
2632    } else {
2633      // Not all regionNameOrEncodedRegionName here is going to be a valid region name,
2634      // it needs to throw out IllegalArgumentException in case tableName is passed in.
2635      RegionInfo regionInfo;
2636      try {
2637        regionInfo =
2638          CatalogFamilyFormat.parseRegionInfoFromRegionName(regionNameOrEncodedRegionName);
2639      } catch (IOException ioe) {
2640        return failedFuture(new IllegalArgumentException(ioe.getMessage()));
2641      }
2642
2643      if (regionInfo.isMetaRegion()) {
2644        future = connection.registry.getMetaRegionLocations()
2645          .thenApply(locs -> Stream.of(locs.getRegionLocations())
2646            .filter(loc -> loc.getRegion().getReplicaId() == regionInfo.getReplicaId())
2647            .findFirst());
2648      } else {
2649        future =
2650          ClientMetaTableAccessor.getRegionLocation(metaTable, regionNameOrEncodedRegionName);
2651      }
2652    }
2653
2654    CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>();
2655    addListener(future, (location, err) -> {
2656      if (err != null) {
2657        returnedFuture.completeExceptionally(err);
2658        return;
2659      }
2660      if (!location.isPresent() || location.get().getRegion() == null) {
2661        returnedFuture.completeExceptionally(
2662          new UnknownRegionException("Invalid region name or encoded region name: "
2663            + Bytes.toStringBinary(regionNameOrEncodedRegionName)));
2664      } else {
2665        returnedFuture.complete(location.get());
2666      }
2667    });
2668    return returnedFuture;
2669  }
2670
2671  /**
2672   * Get the region info for the passed region name. The region name may be a full region name or
2673   * encoded region name. If the region does not found, then it'll throw an UnknownRegionException
2674   * wrapped by a {@link CompletableFuture}
2675   * @return region info, wrapped by a {@link CompletableFuture}
2676   */
2677  private CompletableFuture<RegionInfo> getRegionInfo(byte[] regionNameOrEncodedRegionName) {
2678    if (regionNameOrEncodedRegionName == null) {
2679      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2680    }
2681
2682    if (
2683      Bytes.equals(regionNameOrEncodedRegionName,
2684        RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName())
2685        || Bytes.equals(regionNameOrEncodedRegionName,
2686          RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes())
2687    ) {
2688      return CompletableFuture.completedFuture(RegionInfoBuilder.FIRST_META_REGIONINFO);
2689    }
2690
2691    CompletableFuture<RegionInfo> future = new CompletableFuture<>();
2692    addListener(getRegionLocation(regionNameOrEncodedRegionName), (location, err) -> {
2693      if (err != null) {
2694        future.completeExceptionally(err);
2695      } else {
2696        future.complete(location.getRegion());
2697      }
2698    });
2699    return future;
2700  }
2701
2702  private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) {
2703    if (numRegions < 3) {
2704      throw new IllegalArgumentException("Must create at least three regions");
2705    } else if (Bytes.compareTo(startKey, endKey) >= 0) {
2706      throw new IllegalArgumentException("Start key must be smaller than end key");
2707    }
2708    if (numRegions == 3) {
2709      return new byte[][] { startKey, endKey };
2710    }
2711    byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
2712    if (splitKeys == null || splitKeys.length != numRegions - 1) {
2713      throw new IllegalArgumentException("Unable to split key range into enough regions");
2714    }
2715    return splitKeys;
2716  }
2717
2718  private void verifySplitKeys(byte[][] splitKeys) {
2719    Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR);
2720    // Verify there are no duplicate split keys
2721    byte[] lastKey = null;
2722    for (byte[] splitKey : splitKeys) {
2723      if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) {
2724        throw new IllegalArgumentException("Empty split key must not be passed in the split keys.");
2725      }
2726      if (lastKey != null && Bytes.equals(splitKey, lastKey)) {
2727        throw new IllegalArgumentException("All split keys must be unique, " + "found duplicate: "
2728          + Bytes.toStringBinary(splitKey) + ", " + Bytes.toStringBinary(lastKey));
2729      }
2730      lastKey = splitKey;
2731    }
2732  }
2733
2734  private static abstract class ProcedureBiConsumer<T> implements BiConsumer<T, Throwable> {
2735
2736    abstract void onFinished();
2737
2738    abstract void onError(Throwable error);
2739
2740    @Override
2741    public void accept(T value, Throwable error) {
2742      if (error != null) {
2743        onError(error);
2744        return;
2745      }
2746      onFinished();
2747    }
2748  }
2749
2750  private static abstract class TableProcedureBiConsumer extends ProcedureBiConsumer<Void> {
2751    protected final TableName tableName;
2752
2753    TableProcedureBiConsumer(TableName tableName) {
2754      this.tableName = tableName;
2755    }
2756
2757    abstract String getOperationType();
2758
2759    String getDescription() {
2760      return "Operation: " + getOperationType() + ", " + "Table Name: "
2761        + tableName.getNameWithNamespaceInclAsString();
2762    }
2763
2764    @Override
2765    void onFinished() {
2766      LOG.info(getDescription() + " completed");
2767    }
2768
2769    @Override
2770    void onError(Throwable error) {
2771      LOG.info(getDescription() + " failed with " + error.getMessage());
2772    }
2773  }
2774
2775  private static abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer<Void> {
2776    protected final String namespaceName;
2777
2778    NamespaceProcedureBiConsumer(String namespaceName) {
2779      this.namespaceName = namespaceName;
2780    }
2781
2782    abstract String getOperationType();
2783
2784    String getDescription() {
2785      return "Operation: " + getOperationType() + ", Namespace: " + namespaceName;
2786    }
2787
2788    @Override
2789    void onFinished() {
2790      LOG.info("{} completed", getDescription());
2791    }
2792
2793    @Override
2794    void onError(Throwable error) {
2795      LOG.info("{} failed with {}", getDescription(), error.getMessage());
2796    }
2797  }
2798
2799  private static class RestoreBackupSystemTableProcedureBiConsumer extends ProcedureBiConsumer {
2800
2801    @Override
2802    void onFinished() {
2803      LOG.info("RestoreBackupSystemTableProcedure completed");
2804    }
2805
2806    @Override
2807    void onError(Throwable error) {
2808      LOG.info("RestoreBackupSystemTableProcedure failed with {}", error.getMessage());
2809    }
2810  }
2811
2812  private static class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2813
2814    CreateTableProcedureBiConsumer(TableName tableName) {
2815      super(tableName);
2816    }
2817
2818    @Override
2819    String getOperationType() {
2820      return "CREATE";
2821    }
2822  }
2823
2824  private static class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer {
2825
2826    ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
2827      super(tableName);
2828    }
2829
2830    @Override
2831    String getOperationType() {
2832      return "MODIFY";
2833    }
2834  }
2835
2836  private static class ModifyTableStoreFileTrackerProcedureBiConsumer
2837    extends TableProcedureBiConsumer {
2838
2839    ModifyTableStoreFileTrackerProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
2840      super(tableName);
2841    }
2842
2843    @Override
2844    String getOperationType() {
2845      return "MODIFY_TABLE_STORE_FILE_TRACKER";
2846    }
2847  }
2848
2849  private class DeleteTableProcedureBiConsumer extends TableProcedureBiConsumer {
2850
2851    DeleteTableProcedureBiConsumer(TableName tableName) {
2852      super(tableName);
2853    }
2854
2855    @Override
2856    String getOperationType() {
2857      return "DELETE";
2858    }
2859
2860    @Override
2861    void onFinished() {
2862      connection.getLocator().clearCache(this.tableName);
2863      super.onFinished();
2864    }
2865  }
2866
2867  private static class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2868
2869    TruncateTableProcedureBiConsumer(TableName tableName) {
2870      super(tableName);
2871    }
2872
2873    @Override
2874    String getOperationType() {
2875      return "TRUNCATE";
2876    }
2877  }
2878
2879  private static class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2880
2881    EnableTableProcedureBiConsumer(TableName tableName) {
2882      super(tableName);
2883    }
2884
2885    @Override
2886    String getOperationType() {
2887      return "ENABLE";
2888    }
2889  }
2890
2891  private static class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2892
2893    DisableTableProcedureBiConsumer(TableName tableName) {
2894      super(tableName);
2895    }
2896
2897    @Override
2898    String getOperationType() {
2899      return "DISABLE";
2900    }
2901  }
2902
2903  private static class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2904
2905    AddColumnFamilyProcedureBiConsumer(TableName tableName) {
2906      super(tableName);
2907    }
2908
2909    @Override
2910    String getOperationType() {
2911      return "ADD_COLUMN_FAMILY";
2912    }
2913  }
2914
2915  private static class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2916
2917    DeleteColumnFamilyProcedureBiConsumer(TableName tableName) {
2918      super(tableName);
2919    }
2920
2921    @Override
2922    String getOperationType() {
2923      return "DELETE_COLUMN_FAMILY";
2924    }
2925  }
2926
2927  private static class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2928
2929    ModifyColumnFamilyProcedureBiConsumer(TableName tableName) {
2930      super(tableName);
2931    }
2932
2933    @Override
2934    String getOperationType() {
2935      return "MODIFY_COLUMN_FAMILY";
2936    }
2937  }
2938
2939  private static class ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer
2940    extends TableProcedureBiConsumer {
2941
2942    ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer(TableName tableName) {
2943      super(tableName);
2944    }
2945
2946    @Override
2947    String getOperationType() {
2948      return "MODIFY_COLUMN_FAMILY_STORE_FILE_TRACKER";
2949    }
2950  }
2951
2952  private static class FlushTableProcedureBiConsumer extends TableProcedureBiConsumer {
2953
2954    FlushTableProcedureBiConsumer(TableName tableName) {
2955      super(tableName);
2956    }
2957
2958    @Override
2959    String getOperationType() {
2960      return "FLUSH";
2961    }
2962  }
2963
2964  private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2965
2966    CreateNamespaceProcedureBiConsumer(String namespaceName) {
2967      super(namespaceName);
2968    }
2969
2970    @Override
2971    String getOperationType() {
2972      return "CREATE_NAMESPACE";
2973    }
2974  }
2975
2976  private static class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2977
2978    DeleteNamespaceProcedureBiConsumer(String namespaceName) {
2979      super(namespaceName);
2980    }
2981
2982    @Override
2983    String getOperationType() {
2984      return "DELETE_NAMESPACE";
2985    }
2986  }
2987
2988  private static class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2989
2990    ModifyNamespaceProcedureBiConsumer(String namespaceName) {
2991      super(namespaceName);
2992    }
2993
2994    @Override
2995    String getOperationType() {
2996      return "MODIFY_NAMESPACE";
2997    }
2998  }
2999
3000  private static class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
3001
3002    MergeTableRegionProcedureBiConsumer(TableName tableName) {
3003      super(tableName);
3004    }
3005
3006    @Override
3007    String getOperationType() {
3008      return "MERGE_REGIONS";
3009    }
3010  }
3011
3012  private static class SplitTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
3013
3014    SplitTableRegionProcedureBiConsumer(TableName tableName) {
3015      super(tableName);
3016    }
3017
3018    @Override
3019    String getOperationType() {
3020      return "SPLIT_REGION";
3021    }
3022  }
3023
3024  private static class TruncateRegionProcedureBiConsumer extends TableProcedureBiConsumer {
3025
3026    TruncateRegionProcedureBiConsumer(TableName tableName) {
3027      super(tableName);
3028    }
3029
3030    @Override
3031    String getOperationType() {
3032      return "TRUNCATE_REGION";
3033    }
3034  }
3035
3036  private static class SnapshotProcedureBiConsumer extends TableProcedureBiConsumer {
3037    SnapshotProcedureBiConsumer(TableName tableName) {
3038      super(tableName);
3039    }
3040
3041    @Override
3042    String getOperationType() {
3043      return "SNAPSHOT";
3044    }
3045  }
3046
3047  private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer<Void> {
3048    private final String peerId;
3049    private final Supplier<String> getOperation;
3050
3051    ReplicationProcedureBiConsumer(String peerId, Supplier<String> getOperation) {
3052      this.peerId = peerId;
3053      this.getOperation = getOperation;
3054    }
3055
3056    String getDescription() {
3057      return "Operation: " + getOperation.get() + ", peerId: " + peerId;
3058    }
3059
3060    @Override
3061    void onFinished() {
3062      LOG.info("{} completed", getDescription());
3063    }
3064
3065    @Override
3066    void onError(Throwable error) {
3067      LOG.info("{} failed with {}", getDescription(), error.getMessage());
3068    }
3069  }
3070
3071  private static final class RollAllWALWritersBiConsumer
3072    extends ProcedureBiConsumer<Map<ServerName, Long>> {
3073
3074    @Override
3075    void onFinished() {
3076      LOG.info("Rolling all WAL writers completed");
3077    }
3078
3079    @Override
3080    void onError(Throwable error) {
3081      LOG.warn("Rolling all WAL writers failed with {}", error.getMessage());
3082    }
3083  }
3084
3085  private <T> CompletableFuture<T> waitProcedureResult(CompletableFuture<Long> procFuture,
3086    Converter<T, ByteString> converter) {
3087    CompletableFuture<T> future = new CompletableFuture<>();
3088    addListener(procFuture, (procId, error) -> {
3089      if (error != null) {
3090        future.completeExceptionally(error);
3091        return;
3092      }
3093      getProcedureResult(procId, converter, future, 0);
3094    });
3095    return future;
3096  }
3097
3098  private <T> void getProcedureResult(long procId, Converter<T, ByteString> converter,
3099    CompletableFuture<T> future, int retries) {
3100    addListener(
3101      this.<GetProcedureResultResponse> newMasterCaller()
3102        .action((controller, stub) -> this.<GetProcedureResultRequest, GetProcedureResultResponse,
3103          GetProcedureResultResponse> call(controller, stub,
3104            GetProcedureResultRequest.newBuilder().setProcId(procId).build(),
3105            (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp))
3106        .call(),
3107      (response, error) -> {
3108        if (error != null) {
3109          LOG.warn("failed to get the procedure result procId={}", procId,
3110            ConnectionUtils.translateException(error));
3111          retryTimer.newTimeout(t -> getProcedureResult(procId, converter, future, retries + 1),
3112            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
3113          return;
3114        }
3115        if (response.getState() == GetProcedureResultResponse.State.RUNNING) {
3116          retryTimer.newTimeout(t -> getProcedureResult(procId, converter, future, retries + 1),
3117            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
3118          return;
3119        }
3120        if (response.hasException()) {
3121          IOException ioe = ForeignExceptionUtil.toIOException(response.getException());
3122          future.completeExceptionally(ioe);
3123        } else {
3124          try {
3125            future.complete(converter.convert(response.getResult()));
3126          } catch (IOException e) {
3127            future.completeExceptionally(e);
3128          }
3129        }
3130      });
3131  }
3132
3133  private <T> CompletableFuture<T> failedFuture(Throwable error) {
3134    CompletableFuture<T> future = new CompletableFuture<>();
3135    future.completeExceptionally(error);
3136    return future;
3137  }
3138
3139  private <T> boolean completeExceptionally(CompletableFuture<T> future, Throwable error) {
3140    if (error != null) {
3141      future.completeExceptionally(error);
3142      return true;
3143    }
3144    return false;
3145  }
3146
3147  @Override
3148  public CompletableFuture<ClusterMetrics> getClusterMetrics() {
3149    return getClusterMetrics(EnumSet.allOf(Option.class));
3150  }
3151
3152  @Override
3153  public CompletableFuture<ClusterMetrics> getClusterMetrics(EnumSet<Option> options) {
3154    return this.<ClusterMetrics> newMasterCaller()
3155      .action((controller, stub) -> this.<GetClusterStatusRequest, GetClusterStatusResponse,
3156        ClusterMetrics> call(controller, stub,
3157          RequestConverter.buildGetClusterStatusRequest(options),
3158          (s, c, req, done) -> s.getClusterStatus(c, req, done),
3159          resp -> ClusterMetricsBuilder.toClusterMetrics(resp.getClusterStatus())))
3160      .call();
3161  }
3162
3163  @Override
3164  public CompletableFuture<Void> shutdown() {
3165    return this.<Void> newMasterCaller().priority(HIGH_QOS)
3166      .action((controller, stub) -> this.<ShutdownRequest, ShutdownResponse, Void> call(controller,
3167        stub, ShutdownRequest.newBuilder().build(), (s, c, req, done) -> s.shutdown(c, req, done),
3168        resp -> null))
3169      .call();
3170  }
3171
3172  @Override
3173  public CompletableFuture<Void> stopMaster() {
3174    return this.<Void> newMasterCaller().priority(HIGH_QOS)
3175      .action((controller, stub) -> this.<StopMasterRequest, StopMasterResponse, Void> call(
3176        controller, stub, StopMasterRequest.newBuilder().build(),
3177        (s, c, req, done) -> s.stopMaster(c, req, done), resp -> null))
3178      .call();
3179  }
3180
3181  @Override
3182  public CompletableFuture<Void> stopRegionServer(ServerName serverName) {
3183    StopServerRequest request = RequestConverter
3184      .buildStopServerRequest("Called by admin client " + this.connection.toString());
3185    return this.<Void> newAdminCaller().priority(HIGH_QOS)
3186      .action((controller, stub) -> this.<StopServerRequest, StopServerResponse, Void> adminCall(
3187        controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done),
3188        resp -> null))
3189      .serverName(serverName).call();
3190  }
3191
3192  @Override
3193  public CompletableFuture<Void> updateConfiguration(ServerName serverName) {
3194    return this.<Void> newAdminCaller()
3195      .action((controller, stub) -> this.<UpdateConfigurationRequest, UpdateConfigurationResponse,
3196        Void> adminCall(controller, stub, UpdateConfigurationRequest.getDefaultInstance(),
3197          (s, c, req, done) -> s.updateConfiguration(controller, req, done), resp -> null))
3198      .serverName(serverName).call();
3199  }
3200
3201  @Override
3202  public CompletableFuture<Void> updateConfiguration() {
3203    CompletableFuture<Void> future = new CompletableFuture<Void>();
3204    addListener(
3205      getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.MASTER, Option.BACKUP_MASTERS)),
3206      (status, err) -> {
3207        if (err != null) {
3208          future.completeExceptionally(err);
3209        } else {
3210          List<CompletableFuture<Void>> futures = new ArrayList<>();
3211          status.getServersName().forEach(server -> futures.add(updateConfiguration(server)));
3212          futures.add(updateConfiguration(status.getMasterName()));
3213          status.getBackupMasterNames().forEach(master -> futures.add(updateConfiguration(master)));
3214          addListener(
3215            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3216            (result, err2) -> {
3217              if (err2 != null) {
3218                future.completeExceptionally(err2);
3219              } else {
3220                future.complete(result);
3221              }
3222            });
3223        }
3224      });
3225    return future;
3226  }
3227
3228  @Override
3229  public CompletableFuture<Void> updateConfiguration(String groupName) {
3230    CompletableFuture<Void> future = new CompletableFuture<Void>();
3231    addListener(getRSGroup(groupName), (rsGroupInfo, err) -> {
3232      if (err != null) {
3233        future.completeExceptionally(err);
3234      } else if (rsGroupInfo == null) {
3235        future.completeExceptionally(
3236          new IllegalArgumentException("Group does not exist: " + groupName));
3237      } else {
3238        addListener(getClusterMetrics(EnumSet.of(Option.SERVERS_NAME)), (status, err2) -> {
3239          if (err2 != null) {
3240            future.completeExceptionally(err2);
3241          } else {
3242            List<CompletableFuture<Void>> futures = new ArrayList<>();
3243            List<ServerName> groupServers = status.getServersName().stream()
3244              .filter(s -> rsGroupInfo.containsServer(s.getAddress())).collect(Collectors.toList());
3245            groupServers.forEach(server -> futures.add(updateConfiguration(server)));
3246            addListener(
3247              CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3248              (result, err3) -> {
3249                if (err3 != null) {
3250                  future.completeExceptionally(err3);
3251                } else {
3252                  future.complete(result);
3253                }
3254              });
3255          }
3256        });
3257      }
3258    });
3259    return future;
3260  }
3261
3262  @Override
3263  public CompletableFuture<Void> rollWALWriter(ServerName serverName) {
3264    return this.<Void> newAdminCaller()
3265      .action((controller, stub) -> this.<RollWALWriterRequest, RollWALWriterResponse,
3266        Void> adminCall(controller, stub, RequestConverter.buildRollWALWriterRequest(),
3267          (s, c, req, done) -> s.rollWALWriter(controller, req, done), resp -> null))
3268      .serverName(serverName).call();
3269  }
3270
3271  @Override
3272  public CompletableFuture<Map<ServerName, Long>> rollAllWALWriters() {
3273    return this
3274      .<RollAllWALWritersRequest, RollAllWALWritersResponse,
3275        Map<ServerName,
3276          Long>> procedureCall(
3277            RequestConverter.buildRollAllWALWritersRequest(ng.getNonceGroup(), ng.newNonce()),
3278            (s, c, req, done) -> s.rollAllWALWriters(c, req, done), resp -> resp.getProcId(),
3279            result -> LastHighestWalFilenum.parseFrom(result.toByteArray()).getFileNumMap()
3280              .entrySet().stream().collect(Collectors
3281                .toUnmodifiableMap(e -> ServerName.valueOf(e.getKey()), Map.Entry::getValue)),
3282            new RollAllWALWritersBiConsumer());
3283  }
3284
3285  @Override
3286  public CompletableFuture<Void> clearCompactionQueues(ServerName serverName, Set<String> queues) {
3287    return this.<Void> newAdminCaller()
3288      .action((controller, stub) -> this.<ClearCompactionQueuesRequest,
3289        ClearCompactionQueuesResponse, Void> adminCall(controller, stub,
3290          RequestConverter.buildClearCompactionQueuesRequest(queues),
3291          (s, c, req, done) -> s.clearCompactionQueues(controller, req, done), resp -> null))
3292      .serverName(serverName).call();
3293  }
3294
3295  @Override
3296  public CompletableFuture<List<SecurityCapability>> getSecurityCapabilities() {
3297    return this.<List<SecurityCapability>> newMasterCaller()
3298      .action((controller, stub) -> this.<SecurityCapabilitiesRequest, SecurityCapabilitiesResponse,
3299        List<SecurityCapability>> call(controller, stub,
3300          SecurityCapabilitiesRequest.newBuilder().build(),
3301          (s, c, req, done) -> s.getSecurityCapabilities(c, req, done),
3302          (resp) -> ProtobufUtil.toSecurityCapabilityList(resp.getCapabilitiesList())))
3303      .call();
3304  }
3305
3306  @Override
3307  public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName) {
3308    return getRegionMetrics(GetRegionLoadRequest.newBuilder().build(), serverName);
3309  }
3310
3311  @Override
3312  public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName,
3313    TableName tableName) {
3314    Preconditions.checkNotNull(tableName,
3315      "tableName is null. If you don't specify a tableName, use getRegionLoads() instead");
3316    return getRegionMetrics(RequestConverter.buildGetRegionLoadRequest(tableName), serverName);
3317  }
3318
3319  private CompletableFuture<List<RegionMetrics>> getRegionMetrics(GetRegionLoadRequest request,
3320    ServerName serverName) {
3321    return this.<List<RegionMetrics>> newAdminCaller()
3322      .action((controller, stub) -> this.<GetRegionLoadRequest, GetRegionLoadResponse,
3323        List<RegionMetrics>> adminCall(controller, stub, request,
3324          (s, c, req, done) -> s.getRegionLoad(controller, req, done),
3325          RegionMetricsBuilder::toRegionMetrics))
3326      .serverName(serverName).call();
3327  }
3328
3329  @Override
3330  public CompletableFuture<Boolean> isMasterInMaintenanceMode() {
3331    return this.<Boolean> newMasterCaller()
3332      .action((controller, stub) -> this.<IsInMaintenanceModeRequest, IsInMaintenanceModeResponse,
3333        Boolean> call(controller, stub, IsInMaintenanceModeRequest.newBuilder().build(),
3334          (s, c, req, done) -> s.isMasterInMaintenanceMode(c, req, done),
3335          resp -> resp.getInMaintenanceMode()))
3336      .call();
3337  }
3338
3339  @Override
3340  public CompletableFuture<CompactionState> getCompactionState(TableName tableName,
3341    CompactType compactType) {
3342    CompletableFuture<CompactionState> future = new CompletableFuture<>();
3343
3344    switch (compactType) {
3345      case MOB:
3346        addListener(connection.registry.getActiveMaster(), (serverName, err) -> {
3347          if (err != null) {
3348            future.completeExceptionally(err);
3349            return;
3350          }
3351          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
3352
3353          addListener(this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName)
3354            .action((controller, stub) -> this.<GetRegionInfoRequest, GetRegionInfoResponse,
3355              GetRegionInfoResponse> adminCall(controller, stub,
3356                RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true),
3357                (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
3358            .call(), (resp2, err2) -> {
3359              if (err2 != null) {
3360                future.completeExceptionally(err2);
3361              } else {
3362                if (resp2.hasCompactionState()) {
3363                  future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
3364                } else {
3365                  future.complete(CompactionState.NONE);
3366                }
3367              }
3368            });
3369        });
3370        break;
3371      case NORMAL:
3372        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
3373          if (err != null) {
3374            future.completeExceptionally(err);
3375            return;
3376          }
3377          ConcurrentLinkedQueue<CompactionState> regionStates = new ConcurrentLinkedQueue<>();
3378          List<CompletableFuture<CompactionState>> futures = new ArrayList<>();
3379          locations.stream().filter(loc -> loc.getServerName() != null)
3380            .filter(loc -> loc.getRegion() != null).filter(loc -> !loc.getRegion().isOffline())
3381            .map(loc -> loc.getRegion().getRegionName()).forEach(region -> {
3382              futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> {
3383                // If any region compaction state is MAJOR_AND_MINOR
3384                // the table compaction state is MAJOR_AND_MINOR, too.
3385                if (err2 != null) {
3386                  future.completeExceptionally(unwrapCompletionException(err2));
3387                } else if (regionState == CompactionState.MAJOR_AND_MINOR) {
3388                  future.complete(regionState);
3389                } else {
3390                  regionStates.add(regionState);
3391                }
3392              }));
3393            });
3394          addListener(
3395            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3396            (ret, err3) -> {
3397              // If future not completed, check all regions's compaction state
3398              if (!future.isCompletedExceptionally() && !future.isDone()) {
3399                CompactionState state = CompactionState.NONE;
3400                for (CompactionState regionState : regionStates) {
3401                  switch (regionState) {
3402                    case MAJOR:
3403                      if (state == CompactionState.MINOR) {
3404                        future.complete(CompactionState.MAJOR_AND_MINOR);
3405                      } else {
3406                        state = CompactionState.MAJOR;
3407                      }
3408                      break;
3409                    case MINOR:
3410                      if (state == CompactionState.MAJOR) {
3411                        future.complete(CompactionState.MAJOR_AND_MINOR);
3412                      } else {
3413                        state = CompactionState.MINOR;
3414                      }
3415                      break;
3416                    case NONE:
3417                    default:
3418                  }
3419                }
3420                if (!future.isDone()) {
3421                  future.complete(state);
3422                }
3423              }
3424            });
3425        });
3426        break;
3427      default:
3428        throw new IllegalArgumentException("Unknown compactType: " + compactType);
3429    }
3430
3431    return future;
3432  }
3433
3434  @Override
3435  public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) {
3436    CompletableFuture<CompactionState> future = new CompletableFuture<>();
3437    addListener(getRegionLocation(regionName), (location, err) -> {
3438      if (err != null) {
3439        future.completeExceptionally(err);
3440        return;
3441      }
3442      ServerName serverName = location.getServerName();
3443      if (serverName == null) {
3444        future
3445          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
3446        return;
3447      }
3448      addListener(
3449        this.<GetRegionInfoResponse> newAdminCaller()
3450          .action((controller, stub) -> this.<GetRegionInfoRequest, GetRegionInfoResponse,
3451            GetRegionInfoResponse> adminCall(controller, stub,
3452              RequestConverter.buildGetRegionInfoRequest(location.getRegion().getRegionName(),
3453                true),
3454              (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
3455          .serverName(serverName).call(),
3456        (resp2, err2) -> {
3457          if (err2 != null) {
3458            future.completeExceptionally(err2);
3459          } else {
3460            if (resp2.hasCompactionState()) {
3461              future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
3462            } else {
3463              future.complete(CompactionState.NONE);
3464            }
3465          }
3466        });
3467    });
3468    return future;
3469  }
3470
3471  @Override
3472  public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName) {
3473    MajorCompactionTimestampRequest request = MajorCompactionTimestampRequest.newBuilder()
3474      .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
3475    return this.<Optional<Long>> newMasterCaller()
3476      .action((controller, stub) -> this.<MajorCompactionTimestampRequest,
3477        MajorCompactionTimestampResponse, Optional<Long>> call(controller, stub, request,
3478          (s, c, req, done) -> s.getLastMajorCompactionTimestamp(c, req, done),
3479          ProtobufUtil::toOptionalTimestamp))
3480      .call();
3481  }
3482
3483  @Override
3484  public CompletableFuture<Optional<Long>>
3485    getLastMajorCompactionTimestampForRegion(byte[] regionName) {
3486    CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
3487    // regionName may be a full region name or encoded region name, so getRegionInfo(byte[]) first
3488    addListener(getRegionInfo(regionName), (region, err) -> {
3489      if (err != null) {
3490        future.completeExceptionally(err);
3491        return;
3492      }
3493      MajorCompactionTimestampForRegionRequest.Builder builder =
3494        MajorCompactionTimestampForRegionRequest.newBuilder();
3495      builder.setRegion(
3496        RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName));
3497      addListener(this.<Optional<Long>> newMasterCaller()
3498        .action((controller, stub) -> this.<MajorCompactionTimestampForRegionRequest,
3499          MajorCompactionTimestampResponse, Optional<Long>> call(controller, stub, builder.build(),
3500            (s, c, req, done) -> s.getLastMajorCompactionTimestampForRegion(c, req, done),
3501            ProtobufUtil::toOptionalTimestamp))
3502        .call(), (timestamp, err2) -> {
3503          if (err2 != null) {
3504            future.completeExceptionally(err2);
3505          } else {
3506            future.complete(timestamp);
3507          }
3508        });
3509    });
3510    return future;
3511  }
3512
3513  @Override
3514  public CompletableFuture<Map<ServerName, Boolean>> compactionSwitch(boolean switchState,
3515    List<String> serverNamesList) {
3516    CompletableFuture<Map<ServerName, Boolean>> future = new CompletableFuture<>();
3517    addListener(getRegionServerList(serverNamesList), (serverNames, err) -> {
3518      if (err != null) {
3519        future.completeExceptionally(err);
3520        return;
3521      }
3522      // Accessed by multiple threads.
3523      Map<ServerName, Boolean> serverStates = new ConcurrentHashMap<>(serverNames.size());
3524      List<CompletableFuture<Boolean>> futures = new ArrayList<>(serverNames.size());
3525      serverNames.stream().forEach(serverName -> {
3526        futures.add(switchCompact(serverName, switchState).whenComplete((serverState, err2) -> {
3527          if (err2 != null) {
3528            future.completeExceptionally(unwrapCompletionException(err2));
3529          } else {
3530            serverStates.put(serverName, serverState);
3531          }
3532        }));
3533      });
3534      addListener(
3535        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3536        (ret, err3) -> {
3537          if (!future.isCompletedExceptionally()) {
3538            if (err3 != null) {
3539              future.completeExceptionally(err3);
3540            } else {
3541              future.complete(serverStates);
3542            }
3543          }
3544        });
3545    });
3546    return future;
3547  }
3548
3549  private CompletableFuture<List<ServerName>> getRegionServerList(List<String> serverNamesList) {
3550    CompletableFuture<List<ServerName>> future = new CompletableFuture<>();
3551    if (serverNamesList.isEmpty()) {
3552      CompletableFuture<ClusterMetrics> clusterMetricsCompletableFuture =
3553        getClusterMetrics(EnumSet.of(Option.SERVERS_NAME));
3554      addListener(clusterMetricsCompletableFuture, (clusterMetrics, err) -> {
3555        if (err != null) {
3556          future.completeExceptionally(err);
3557        } else {
3558          future.complete(clusterMetrics.getServersName());
3559        }
3560      });
3561      return future;
3562    } else {
3563      List<ServerName> serverList = new ArrayList<>();
3564      for (String regionServerName : serverNamesList) {
3565        ServerName serverName = null;
3566        try {
3567          serverName = ServerName.valueOf(regionServerName);
3568        } catch (Exception e) {
3569          future.completeExceptionally(
3570            new IllegalArgumentException(String.format("ServerName format: %s", regionServerName)));
3571        }
3572        if (serverName == null) {
3573          future.completeExceptionally(
3574            new IllegalArgumentException(String.format("Null ServerName: %s", regionServerName)));
3575        } else {
3576          serverList.add(serverName);
3577        }
3578      }
3579      future.complete(serverList);
3580    }
3581    return future;
3582  }
3583
3584  private CompletableFuture<Boolean> switchCompact(ServerName serverName, boolean onOrOff) {
3585    return this.<Boolean> newAdminCaller().serverName(serverName)
3586      .action((controller, stub) -> this.<CompactionSwitchRequest, CompactionSwitchResponse,
3587        Boolean> adminCall(controller, stub,
3588          CompactionSwitchRequest.newBuilder().setEnabled(onOrOff).build(),
3589          (s, c, req, done) -> s.compactionSwitch(c, req, done), resp -> resp.getPrevState()))
3590      .call();
3591  }
3592
3593  @Override
3594  public CompletableFuture<Boolean> balancerSwitch(boolean on, boolean drainRITs) {
3595    return this.<Boolean> newMasterCaller()
3596      .action((controller, stub) -> this.<SetBalancerRunningRequest, SetBalancerRunningResponse,
3597        Boolean> call(controller, stub,
3598          RequestConverter.buildSetBalancerRunningRequest(on, drainRITs),
3599          (s, c, req, done) -> s.setBalancerRunning(c, req, done),
3600          (resp) -> resp.getPrevBalanceValue()))
3601      .call();
3602  }
3603
3604  @Override
3605  public CompletableFuture<BalanceResponse> balance(BalanceRequest request) {
3606    return this.<BalanceResponse> newMasterCaller()
3607      .action((controller, stub) -> this.<MasterProtos.BalanceRequest, MasterProtos.BalanceResponse,
3608        BalanceResponse> call(controller, stub, ProtobufUtil.toBalanceRequest(request),
3609          (s, c, req, done) -> s.balance(c, req, done),
3610          (resp) -> ProtobufUtil.toBalanceResponse(resp)))
3611      .call();
3612  }
3613
3614  @Override
3615  public CompletableFuture<Boolean> isBalancerEnabled() {
3616    return this.<Boolean> newMasterCaller()
3617      .action((controller, stub) -> this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse,
3618        Boolean> call(controller, stub, RequestConverter.buildIsBalancerEnabledRequest(),
3619          (s, c, req, done) -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled()))
3620      .call();
3621  }
3622
3623  @Override
3624  public CompletableFuture<Boolean> normalizerSwitch(boolean on) {
3625    return this.<Boolean> newMasterCaller()
3626      .action((controller, stub) -> this.<SetNormalizerRunningRequest, SetNormalizerRunningResponse,
3627        Boolean> call(controller, stub, RequestConverter.buildSetNormalizerRunningRequest(on),
3628          (s, c, req, done) -> s.setNormalizerRunning(c, req, done),
3629          (resp) -> resp.getPrevNormalizerValue()))
3630      .call();
3631  }
3632
3633  @Override
3634  public CompletableFuture<Boolean> isNormalizerEnabled() {
3635    return this.<Boolean> newMasterCaller()
3636      .action((controller, stub) -> this.<IsNormalizerEnabledRequest, IsNormalizerEnabledResponse,
3637        Boolean> call(controller, stub, RequestConverter.buildIsNormalizerEnabledRequest(),
3638          (s, c, req, done) -> s.isNormalizerEnabled(c, req, done), (resp) -> resp.getEnabled()))
3639      .call();
3640  }
3641
3642  @Override
3643  public CompletableFuture<Boolean> normalize(NormalizeTableFilterParams ntfp) {
3644    return normalize(RequestConverter.buildNormalizeRequest(ntfp));
3645  }
3646
3647  private CompletableFuture<Boolean> normalize(NormalizeRequest request) {
3648    return this.<Boolean> newMasterCaller().action((controller, stub) -> this.call(controller, stub,
3649      request, MasterService.Interface::normalize, NormalizeResponse::getNormalizerRan)).call();
3650  }
3651
3652  @Override
3653  public CompletableFuture<Boolean> cleanerChoreSwitch(boolean enabled) {
3654    return this.<Boolean> newMasterCaller()
3655      .action((controller, stub) -> this.<SetCleanerChoreRunningRequest,
3656        SetCleanerChoreRunningResponse, Boolean> call(controller, stub,
3657          RequestConverter.buildSetCleanerChoreRunningRequest(enabled),
3658          (s, c, req, done) -> s.setCleanerChoreRunning(c, req, done),
3659          (resp) -> resp.getPrevValue()))
3660      .call();
3661  }
3662
3663  @Override
3664  public CompletableFuture<Boolean> isCleanerChoreEnabled() {
3665    return this.<Boolean> newMasterCaller()
3666      .action(
3667        (controller, stub) -> this.<IsCleanerChoreEnabledRequest, IsCleanerChoreEnabledResponse,
3668          Boolean> call(controller, stub, RequestConverter.buildIsCleanerChoreEnabledRequest(),
3669            (s, c, req, done) -> s.isCleanerChoreEnabled(c, req, done), (resp) -> resp.getValue()))
3670      .call();
3671  }
3672
3673  @Override
3674  public CompletableFuture<Boolean> runCleanerChore() {
3675    return this.<Boolean> newMasterCaller()
3676      .action((controller, stub) -> this.<RunCleanerChoreRequest, RunCleanerChoreResponse,
3677        Boolean> call(controller, stub, RequestConverter.buildRunCleanerChoreRequest(),
3678          (s, c, req, done) -> s.runCleanerChore(c, req, done),
3679          (resp) -> resp.getCleanerChoreRan()))
3680      .call();
3681  }
3682
3683  @Override
3684  public CompletableFuture<Boolean> catalogJanitorSwitch(boolean enabled) {
3685    return this.<Boolean> newMasterCaller()
3686      .action((controller, stub) -> this.<EnableCatalogJanitorRequest, EnableCatalogJanitorResponse,
3687        Boolean> call(controller, stub, RequestConverter.buildEnableCatalogJanitorRequest(enabled),
3688          (s, c, req, done) -> s.enableCatalogJanitor(c, req, done), (resp) -> resp.getPrevValue()))
3689      .call();
3690  }
3691
3692  @Override
3693  public CompletableFuture<Boolean> isCatalogJanitorEnabled() {
3694    return this.<Boolean> newMasterCaller()
3695      .action((controller, stub) -> this.<IsCatalogJanitorEnabledRequest,
3696        IsCatalogJanitorEnabledResponse, Boolean> call(controller, stub,
3697          RequestConverter.buildIsCatalogJanitorEnabledRequest(),
3698          (s, c, req, done) -> s.isCatalogJanitorEnabled(c, req, done), (resp) -> resp.getValue()))
3699      .call();
3700  }
3701
3702  @Override
3703  public CompletableFuture<Integer> runCatalogJanitor() {
3704    return this.<Integer> newMasterCaller()
3705      .action((controller, stub) -> this.<RunCatalogScanRequest, RunCatalogScanResponse,
3706        Integer> call(controller, stub, RequestConverter.buildCatalogScanRequest(),
3707          (s, c, req, done) -> s.runCatalogScan(c, req, done), (resp) -> resp.getScanResult()))
3708      .call();
3709  }
3710
3711  @Override
3712  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3713    ServiceCaller<S, R> callable) {
3714    MasterCoprocessorRpcChannelImpl channel =
3715      new MasterCoprocessorRpcChannelImpl(this.<Message> newMasterCaller());
3716    S stub = stubMaker.apply(channel);
3717    CompletableFuture<R> future = new CompletableFuture<>();
3718    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3719    callable.call(stub, controller, resp -> {
3720      if (controller.failed()) {
3721        future.completeExceptionally(controller.getFailed());
3722      } else {
3723        future.complete(resp);
3724      }
3725    });
3726    return future;
3727  }
3728
3729  @Override
3730  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3731    ServiceCaller<S, R> callable, ServerName serverName) {
3732    RegionServerCoprocessorRpcChannelImpl channel = new RegionServerCoprocessorRpcChannelImpl(
3733      this.<Message> newServerCaller().serverName(serverName));
3734    S stub = stubMaker.apply(channel);
3735    CompletableFuture<R> future = new CompletableFuture<>();
3736    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3737    callable.call(stub, controller, resp -> {
3738      if (controller.failed()) {
3739        future.completeExceptionally(controller.getFailed());
3740      } else {
3741        future.complete(resp);
3742      }
3743    });
3744    return future;
3745  }
3746
3747  @Override
3748  public CompletableFuture<List<ServerName>> clearDeadServers(List<ServerName> servers) {
3749    return this.<List<ServerName>> newMasterCaller()
3750      .action((controller, stub) -> this.<ClearDeadServersRequest, ClearDeadServersResponse,
3751        List<ServerName>> call(controller, stub,
3752          RequestConverter.buildClearDeadServersRequest(servers),
3753          (s, c, req, done) -> s.clearDeadServers(c, req, done),
3754          (resp) -> ProtobufUtil.toServerNameList(resp.getServerNameList())))
3755      .call();
3756  }
3757
3758  <T> ServerRequestCallerBuilder<T> newServerCaller() {
3759    return this.connection.callerFactory.<T> serverRequest()
3760      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
3761      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
3762      .pause(pauseNs, TimeUnit.NANOSECONDS)
3763      .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
3764      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
3765  }
3766
3767  @Override
3768  public CompletableFuture<Void> enableTableReplication(TableName tableName) {
3769    if (tableName == null) {
3770      return failedFuture(new IllegalArgumentException("Table name is null"));
3771    }
3772    CompletableFuture<Void> future = new CompletableFuture<>();
3773    addListener(tableExists(tableName), (exist, err) -> {
3774      if (err != null) {
3775        future.completeExceptionally(err);
3776        return;
3777      }
3778      if (!exist) {
3779        future.completeExceptionally(new TableNotFoundException(
3780          "Table '" + tableName.getNameAsString() + "' does not exists."));
3781        return;
3782      }
3783      addListener(getTableSplits(tableName), (splits, err1) -> {
3784        if (err1 != null) {
3785          future.completeExceptionally(err1);
3786        } else {
3787          addListener(checkAndSyncTableToPeerClusters(tableName, splits), (result, err2) -> {
3788            if (err2 != null) {
3789              future.completeExceptionally(err2);
3790            } else {
3791              addListener(setTableReplication(tableName, true), (result3, err3) -> {
3792                if (err3 != null) {
3793                  future.completeExceptionally(err3);
3794                } else {
3795                  future.complete(result3);
3796                }
3797              });
3798            }
3799          });
3800        }
3801      });
3802    });
3803    return future;
3804  }
3805
3806  @Override
3807  public CompletableFuture<Void> disableTableReplication(TableName tableName) {
3808    if (tableName == null) {
3809      return failedFuture(new IllegalArgumentException("Table name is null"));
3810    }
3811    CompletableFuture<Void> future = new CompletableFuture<>();
3812    addListener(tableExists(tableName), (exist, err) -> {
3813      if (err != null) {
3814        future.completeExceptionally(err);
3815        return;
3816      }
3817      if (!exist) {
3818        future.completeExceptionally(new TableNotFoundException(
3819          "Table '" + tableName.getNameAsString() + "' does not exists."));
3820        return;
3821      }
3822      addListener(setTableReplication(tableName, false), (result, err2) -> {
3823        if (err2 != null) {
3824          future.completeExceptionally(err2);
3825        } else {
3826          future.complete(result);
3827        }
3828      });
3829    });
3830    return future;
3831  }
3832
3833  private CompletableFuture<byte[][]> getTableSplits(TableName tableName) {
3834    CompletableFuture<byte[][]> future = new CompletableFuture<>();
3835    addListener(
3836      getRegions(tableName).thenApply(regions -> regions.stream()
3837        .filter(RegionReplicaUtil::isDefaultReplica).collect(Collectors.toList())),
3838      (regions, err2) -> {
3839        if (err2 != null) {
3840          future.completeExceptionally(err2);
3841          return;
3842        }
3843        if (regions.size() == 1) {
3844          future.complete(null);
3845        } else {
3846          byte[][] splits = new byte[regions.size() - 1][];
3847          for (int i = 1; i < regions.size(); i++) {
3848            splits[i - 1] = regions.get(i).getStartKey();
3849          }
3850          future.complete(splits);
3851        }
3852      });
3853    return future;
3854  }
3855
3856  /**
3857   * Connect to peer and check the table descriptor on peer:
3858   * <ol>
3859   * <li>Create the same table on peer when not exist.</li>
3860   * <li>Throw an exception if the table already has replication enabled on any of the column
3861   * families.</li>
3862   * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li>
3863   * </ol>
3864   * @param tableName name of the table to sync to the peer
3865   * @param splits    table split keys
3866   */
3867  private CompletableFuture<Void> checkAndSyncTableToPeerClusters(TableName tableName,
3868    byte[][] splits) {
3869    CompletableFuture<Void> future = new CompletableFuture<>();
3870    addListener(listReplicationPeers(), (peers, err) -> {
3871      if (err != null) {
3872        future.completeExceptionally(err);
3873        return;
3874      }
3875      if (peers == null || peers.size() <= 0) {
3876        future.completeExceptionally(
3877          new IllegalArgumentException("Found no peer cluster for replication."));
3878        return;
3879      }
3880      List<CompletableFuture<Void>> futures = new ArrayList<>();
3881      peers.stream().filter(peer -> peer.getPeerConfig().needToReplicate(tableName))
3882        .forEach(peer -> {
3883          futures.add(trySyncTableToPeerCluster(tableName, splits, peer));
3884        });
3885      addListener(
3886        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3887        (result, err2) -> {
3888          if (err2 != null) {
3889            future.completeExceptionally(err2);
3890          } else {
3891            future.complete(result);
3892          }
3893        });
3894    });
3895    return future;
3896  }
3897
3898  private CompletableFuture<Void> trySyncTableToPeerCluster(TableName tableName, byte[][] splits,
3899    ReplicationPeerDescription peer) {
3900    Configuration peerConf;
3901    try {
3902      peerConf = ReplicationPeerConfigUtil
3903        .getPeerClusterConfiguration(connection.getConfiguration(), peer.getPeerConfig());
3904    } catch (IOException e) {
3905      return failedFuture(e);
3906    }
3907    URI connectionUri =
3908      ConnectionRegistryFactory.tryParseAsConnectionURI(peer.getPeerConfig().getClusterKey());
3909    CompletableFuture<Void> future = new CompletableFuture<>();
3910    addListener(ConnectionFactory.createAsyncConnection(connectionUri, peerConf), (conn, err) -> {
3911      if (err != null) {
3912        future.completeExceptionally(err);
3913        return;
3914      }
3915      addListener(getDescriptor(tableName), (tableDesc, err1) -> {
3916        if (err1 != null) {
3917          future.completeExceptionally(err1);
3918          return;
3919        }
3920        AsyncAdmin peerAdmin = conn.getAdmin();
3921        addListener(peerAdmin.tableExists(tableName), (exist, err2) -> {
3922          if (err2 != null) {
3923            future.completeExceptionally(err2);
3924            return;
3925          }
3926          if (!exist) {
3927            CompletableFuture<Void> createTableFuture = null;
3928            if (splits == null) {
3929              createTableFuture = peerAdmin.createTable(tableDesc);
3930            } else {
3931              createTableFuture = peerAdmin.createTable(tableDesc, splits);
3932            }
3933            addListener(createTableFuture, (result, err3) -> {
3934              if (err3 != null) {
3935                future.completeExceptionally(err3);
3936              } else {
3937                future.complete(result);
3938              }
3939            });
3940          } else {
3941            addListener(compareTableWithPeerCluster(tableName, tableDesc, peer, peerAdmin),
3942              (result, err4) -> {
3943                if (err4 != null) {
3944                  future.completeExceptionally(err4);
3945                } else {
3946                  future.complete(result);
3947                }
3948              });
3949          }
3950        });
3951      });
3952    });
3953    return future;
3954  }
3955
3956  private CompletableFuture<Void> compareTableWithPeerCluster(TableName tableName,
3957    TableDescriptor tableDesc, ReplicationPeerDescription peer, AsyncAdmin peerAdmin) {
3958    CompletableFuture<Void> future = new CompletableFuture<>();
3959    addListener(peerAdmin.getDescriptor(tableName), (peerTableDesc, err) -> {
3960      if (err != null) {
3961        future.completeExceptionally(err);
3962        return;
3963      }
3964      if (peerTableDesc == null) {
3965        future.completeExceptionally(
3966          new IllegalArgumentException("Failed to get table descriptor for table "
3967            + tableName.getNameAsString() + " from peer cluster " + peer.getPeerId()));
3968        return;
3969      }
3970      if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc, tableDesc) != 0) {
3971        future.completeExceptionally(new IllegalArgumentException(
3972          "Table " + tableName.getNameAsString() + " exists in peer cluster " + peer.getPeerId()
3973            + ", but the table descriptors are not same when compared with source cluster."
3974            + " Thus can not enable the table's replication switch."));
3975        return;
3976      }
3977      future.complete(null);
3978    });
3979    return future;
3980  }
3981
3982  /**
3983   * Set the table's replication switch if the table's replication switch is already not set.
3984   * @param tableName name of the table
3985   * @param enableRep is replication switch enable or disable
3986   */
3987  private CompletableFuture<Void> setTableReplication(TableName tableName, boolean enableRep) {
3988    CompletableFuture<Void> future = new CompletableFuture<>();
3989    addListener(getDescriptor(tableName), (tableDesc, err) -> {
3990      if (err != null) {
3991        future.completeExceptionally(err);
3992        return;
3993      }
3994      if (!tableDesc.matchReplicationScope(enableRep)) {
3995        int scope =
3996          enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL;
3997        TableDescriptor newTableDesc =
3998          TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build();
3999        addListener(modifyTable(newTableDesc), (result, err2) -> {
4000          if (err2 != null) {
4001            future.completeExceptionally(err2);
4002          } else {
4003            future.complete(result);
4004          }
4005        });
4006      } else {
4007        future.complete(null);
4008      }
4009    });
4010    return future;
4011  }
4012
4013  @Override
4014  public CompletableFuture<Boolean> isReplicationPeerEnabled(String peerId) {
4015    GetReplicationPeerStateRequest.Builder request = GetReplicationPeerStateRequest.newBuilder();
4016    request.setPeerId(peerId);
4017    return this.<Boolean> newMasterCaller()
4018      .action((controller, stub) -> this.<GetReplicationPeerStateRequest,
4019        GetReplicationPeerStateResponse, Boolean> call(controller, stub, request.build(),
4020          (s, c, req, done) -> s.isReplicationPeerEnabled(c, req, done),
4021          resp -> resp.getIsEnabled()))
4022      .call();
4023  }
4024
4025  private void waitUntilAllReplicationPeerModificationProceduresDone(
4026    CompletableFuture<Boolean> future, boolean prevOn, int retries) {
4027    CompletableFuture<List<ProcedureProtos.Procedure>> callFuture =
4028      this.<List<ProcedureProtos.Procedure>> newMasterCaller()
4029        .action((controller, stub) -> this.<GetReplicationPeerModificationProceduresRequest,
4030          GetReplicationPeerModificationProceduresResponse, List<ProcedureProtos.Procedure>> call(
4031            controller, stub, GetReplicationPeerModificationProceduresRequest.getDefaultInstance(),
4032            (s, c, req, done) -> s.getReplicationPeerModificationProcedures(c, req, done),
4033            resp -> resp.getProcedureList()))
4034        .call();
4035    addListener(callFuture, (r, e) -> {
4036      if (e != null) {
4037        future.completeExceptionally(e);
4038      } else if (r.isEmpty()) {
4039        // we are done
4040        future.complete(prevOn);
4041      } else {
4042        // retry later to see if the procedures are done
4043        retryTimer.newTimeout(
4044          t -> waitUntilAllReplicationPeerModificationProceduresDone(future, prevOn, retries + 1),
4045          ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
4046      }
4047    });
4048  }
4049
4050  @Override
4051  public CompletableFuture<Boolean> replicationPeerModificationSwitch(boolean on,
4052    boolean drainProcedures) {
4053    ReplicationPeerModificationSwitchRequest request =
4054      ReplicationPeerModificationSwitchRequest.newBuilder().setOn(on).build();
4055    CompletableFuture<Boolean> callFuture = this.<Boolean> newMasterCaller()
4056      .action((controller, stub) -> this.<ReplicationPeerModificationSwitchRequest,
4057        ReplicationPeerModificationSwitchResponse, Boolean> call(controller, stub, request,
4058          (s, c, req, done) -> s.replicationPeerModificationSwitch(c, req, done),
4059          resp -> resp.getPreviousValue()))
4060      .call();
4061    // if we do not need to wait all previous peer modification procedure done, or we are enabling
4062    // peer modification, just return here.
4063    if (!drainProcedures || on) {
4064      return callFuture;
4065    }
4066    // otherwise we need to wait until all previous peer modification procedure done
4067    CompletableFuture<Boolean> future = new CompletableFuture<>();
4068    addListener(callFuture, (prevOn, err) -> {
4069      if (err != null) {
4070        future.completeExceptionally(err);
4071        return;
4072      }
4073      // even if the previous state is disabled, we still need to wait here, as there could be
4074      // another client thread which called this method just before us and have already changed the
4075      // state to off, but there are still peer modification procedures not finished, so we should
4076      // also wait here.
4077      waitUntilAllReplicationPeerModificationProceduresDone(future, prevOn, 0);
4078    });
4079    return future;
4080  }
4081
4082  @Override
4083  public CompletableFuture<Boolean> isReplicationPeerModificationEnabled() {
4084    return this.<Boolean> newMasterCaller()
4085      .action((controller, stub) -> this.<IsReplicationPeerModificationEnabledRequest,
4086        IsReplicationPeerModificationEnabledResponse, Boolean> call(controller, stub,
4087          IsReplicationPeerModificationEnabledRequest.getDefaultInstance(),
4088          (s, c, req, done) -> s.isReplicationPeerModificationEnabled(c, req, done),
4089          (resp) -> resp.getEnabled()))
4090      .call();
4091  }
4092
4093  @Override
4094  public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) {
4095    CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>();
4096    addListener(getTableHRegionLocations(tableName), (locations, err) -> {
4097      if (err != null) {
4098        future.completeExceptionally(err);
4099        return;
4100      }
4101      Map<ServerName, List<RegionInfo>> regionInfoByServerName =
4102        locations.stream().filter(l -> l.getRegion() != null)
4103          .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
4104          .collect(Collectors.groupingBy(l -> l.getServerName(),
4105            Collectors.mapping(l -> l.getRegion(), Collectors.toList())));
4106      List<CompletableFuture<CacheEvictionStats>> futures = new ArrayList<>();
4107      CacheEvictionStatsAggregator aggregator = new CacheEvictionStatsAggregator();
4108      for (Map.Entry<ServerName, List<RegionInfo>> entry : regionInfoByServerName.entrySet()) {
4109        futures
4110          .add(clearBlockCache(entry.getKey(), entry.getValue()).whenComplete((stats, err2) -> {
4111            if (err2 != null) {
4112              future.completeExceptionally(unwrapCompletionException(err2));
4113            } else {
4114              aggregator.append(stats);
4115            }
4116          }));
4117      }
4118      addListener(CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])),
4119        (ret, err3) -> {
4120          if (err3 != null) {
4121            future.completeExceptionally(unwrapCompletionException(err3));
4122          } else {
4123            future.complete(aggregator.sum());
4124          }
4125        });
4126    });
4127    return future;
4128  }
4129
4130  @Override
4131  public CompletableFuture<Void> cloneTableSchema(TableName tableName, TableName newTableName,
4132    boolean preserveSplits) {
4133    CompletableFuture<Void> future = new CompletableFuture<>();
4134    addListener(tableExists(tableName), (exist, err) -> {
4135      if (err != null) {
4136        future.completeExceptionally(err);
4137        return;
4138      }
4139      if (!exist) {
4140        future.completeExceptionally(new TableNotFoundException(tableName));
4141        return;
4142      }
4143      addListener(tableExists(newTableName), (exist1, err1) -> {
4144        if (err1 != null) {
4145          future.completeExceptionally(err1);
4146          return;
4147        }
4148        if (exist1) {
4149          future.completeExceptionally(new TableExistsException(newTableName));
4150          return;
4151        }
4152        addListener(getDescriptor(tableName), (tableDesc, err2) -> {
4153          if (err2 != null) {
4154            future.completeExceptionally(err2);
4155            return;
4156          }
4157          TableDescriptor newTableDesc = TableDescriptorBuilder.copy(newTableName, tableDesc);
4158          if (preserveSplits) {
4159            addListener(getTableSplits(tableName), (splits, err3) -> {
4160              if (err3 != null) {
4161                future.completeExceptionally(err3);
4162              } else {
4163                addListener(
4164                  splits != null ? createTable(newTableDesc, splits) : createTable(newTableDesc),
4165                  (result, err4) -> {
4166                    if (err4 != null) {
4167                      future.completeExceptionally(err4);
4168                    } else {
4169                      future.complete(result);
4170                    }
4171                  });
4172              }
4173            });
4174          } else {
4175            addListener(createTable(newTableDesc), (result, err5) -> {
4176              if (err5 != null) {
4177                future.completeExceptionally(err5);
4178              } else {
4179                future.complete(result);
4180              }
4181            });
4182          }
4183        });
4184      });
4185    });
4186    return future;
4187  }
4188
4189  private CompletableFuture<CacheEvictionStats> clearBlockCache(ServerName serverName,
4190    List<RegionInfo> hris) {
4191    return this.<CacheEvictionStats> newAdminCaller()
4192      .action((controller, stub) -> this.<ClearRegionBlockCacheRequest,
4193        ClearRegionBlockCacheResponse, CacheEvictionStats> adminCall(controller, stub,
4194          RequestConverter.buildClearRegionBlockCacheRequest(hris),
4195          (s, c, req, done) -> s.clearRegionBlockCache(controller, req, done),
4196          resp -> ProtobufUtil.toCacheEvictionStats(resp.getStats())))
4197      .serverName(serverName).call();
4198  }
4199
4200  @Override
4201  public CompletableFuture<Boolean> switchRpcThrottle(boolean enable) {
4202    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
4203      .action((controller, stub) -> this.<SwitchRpcThrottleRequest, SwitchRpcThrottleResponse,
4204        Boolean> call(controller, stub,
4205          SwitchRpcThrottleRequest.newBuilder().setRpcThrottleEnabled(enable).build(),
4206          (s, c, req, done) -> s.switchRpcThrottle(c, req, done),
4207          resp -> resp.getPreviousRpcThrottleEnabled()))
4208      .call();
4209    return future;
4210  }
4211
4212  @Override
4213  public CompletableFuture<Boolean> isRpcThrottleEnabled() {
4214    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
4215      .action((controller, stub) -> this.<IsRpcThrottleEnabledRequest, IsRpcThrottleEnabledResponse,
4216        Boolean> call(controller, stub, IsRpcThrottleEnabledRequest.newBuilder().build(),
4217          (s, c, req, done) -> s.isRpcThrottleEnabled(c, req, done),
4218          resp -> resp.getRpcThrottleEnabled()))
4219      .call();
4220    return future;
4221  }
4222
4223  @Override
4224  public CompletableFuture<Boolean> exceedThrottleQuotaSwitch(boolean enable) {
4225    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
4226      .action((controller, stub) -> this.<SwitchExceedThrottleQuotaRequest,
4227        SwitchExceedThrottleQuotaResponse, Boolean> call(controller, stub,
4228          SwitchExceedThrottleQuotaRequest.newBuilder().setExceedThrottleQuotaEnabled(enable)
4229            .build(),
4230          (s, c, req, done) -> s.switchExceedThrottleQuota(c, req, done),
4231          resp -> resp.getPreviousExceedThrottleQuotaEnabled()))
4232      .call();
4233    return future;
4234  }
4235
4236  @Override
4237  public CompletableFuture<Map<TableName, Long>> getSpaceQuotaTableSizes() {
4238    return this.<Map<TableName, Long>> newMasterCaller()
4239      .action((controller, stub) -> this.<GetSpaceQuotaRegionSizesRequest,
4240        GetSpaceQuotaRegionSizesResponse, Map<TableName, Long>> call(controller, stub,
4241          RequestConverter.buildGetSpaceQuotaRegionSizesRequest(),
4242          (s, c, req, done) -> s.getSpaceQuotaRegionSizes(c, req, done),
4243          resp -> resp.getSizesList().stream().collect(Collectors
4244            .toMap(sizes -> ProtobufUtil.toTableName(sizes.getTableName()), RegionSizes::getSize))))
4245      .call();
4246  }
4247
4248  @Override
4249  public CompletableFuture<Map<TableName, SpaceQuotaSnapshot>>
4250    getRegionServerSpaceQuotaSnapshots(ServerName serverName) {
4251    return this.<Map<TableName, SpaceQuotaSnapshot>> newAdminCaller()
4252      .action((controller, stub) -> this.<GetSpaceQuotaSnapshotsRequest,
4253        GetSpaceQuotaSnapshotsResponse, Map<TableName, SpaceQuotaSnapshot>> adminCall(controller,
4254          stub, RequestConverter.buildGetSpaceQuotaSnapshotsRequest(),
4255          (s, c, req, done) -> s.getSpaceQuotaSnapshots(controller, req, done),
4256          resp -> resp.getSnapshotsList().stream()
4257            .collect(Collectors.toMap(snapshot -> ProtobufUtil.toTableName(snapshot.getTableName()),
4258              snapshot -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(snapshot.getSnapshot())))))
4259      .serverName(serverName).call();
4260  }
4261
4262  private CompletableFuture<SpaceQuotaSnapshot>
4263    getCurrentSpaceQuotaSnapshot(Converter<SpaceQuotaSnapshot, GetQuotaStatesResponse> converter) {
4264    return this.<SpaceQuotaSnapshot> newMasterCaller()
4265      .action((controller, stub) -> this.<GetQuotaStatesRequest, GetQuotaStatesResponse,
4266        SpaceQuotaSnapshot> call(controller, stub, RequestConverter.buildGetQuotaStatesRequest(),
4267          (s, c, req, done) -> s.getQuotaStates(c, req, done), converter))
4268      .call();
4269  }
4270
4271  @Override
4272  public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(String namespace) {
4273    return getCurrentSpaceQuotaSnapshot(resp -> resp.getNsSnapshotsList().stream()
4274      .filter(s -> s.getNamespace().equals(namespace)).findFirst()
4275      .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
4276  }
4277
4278  @Override
4279  public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(TableName tableName) {
4280    HBaseProtos.TableName protoTableName = ProtobufUtil.toProtoTableName(tableName);
4281    return getCurrentSpaceQuotaSnapshot(resp -> resp.getTableSnapshotsList().stream()
4282      .filter(s -> s.getTableName().equals(protoTableName)).findFirst()
4283      .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
4284  }
4285
4286  @Override
4287  public CompletableFuture<Void> grant(UserPermission userPermission,
4288    boolean mergeExistingPermissions) {
4289    return this.<Void> newMasterCaller()
4290      .action((controller, stub) -> this.<GrantRequest, GrantResponse, Void> call(controller, stub,
4291        ShadedAccessControlUtil.buildGrantRequest(userPermission, mergeExistingPermissions),
4292        (s, c, req, done) -> s.grant(c, req, done), resp -> null))
4293      .call();
4294  }
4295
4296  @Override
4297  public CompletableFuture<Void> revoke(UserPermission userPermission) {
4298    return this.<Void> newMasterCaller()
4299      .action((controller, stub) -> this.<RevokeRequest, RevokeResponse, Void> call(controller,
4300        stub, ShadedAccessControlUtil.buildRevokeRequest(userPermission),
4301        (s, c, req, done) -> s.revoke(c, req, done), resp -> null))
4302      .call();
4303  }
4304
4305  @Override
4306  public CompletableFuture<List<UserPermission>>
4307    getUserPermissions(GetUserPermissionsRequest getUserPermissionsRequest) {
4308    return this.<List<UserPermission>> newMasterCaller()
4309      .action((controller, stub) -> this.<AccessControlProtos.GetUserPermissionsRequest,
4310        GetUserPermissionsResponse, List<UserPermission>> call(controller, stub,
4311          ShadedAccessControlUtil.buildGetUserPermissionsRequest(getUserPermissionsRequest),
4312          (s, c, req, done) -> s.getUserPermissions(c, req, done),
4313          resp -> resp.getUserPermissionList().stream()
4314            .map(uPerm -> ShadedAccessControlUtil.toUserPermission(uPerm))
4315            .collect(Collectors.toList())))
4316      .call();
4317  }
4318
4319  @Override
4320  public CompletableFuture<List<Boolean>> hasUserPermissions(String userName,
4321    List<Permission> permissions) {
4322    return this.<List<Boolean>> newMasterCaller()
4323      .action((controller, stub) -> this.<HasUserPermissionsRequest, HasUserPermissionsResponse,
4324        List<Boolean>> call(controller, stub,
4325          ShadedAccessControlUtil.buildHasUserPermissionsRequest(userName, permissions),
4326          (s, c, req, done) -> s.hasUserPermissions(c, req, done),
4327          resp -> resp.getHasUserPermissionList()))
4328      .call();
4329  }
4330
4331  @Override
4332  public CompletableFuture<Boolean> snapshotCleanupSwitch(final boolean on, final boolean sync) {
4333    return this.<Boolean> newMasterCaller()
4334      .action((controller, stub) -> this.call(controller, stub,
4335        RequestConverter.buildSetSnapshotCleanupRequest(on, sync),
4336        MasterService.Interface::switchSnapshotCleanup,
4337        SetSnapshotCleanupResponse::getPrevSnapshotCleanup))
4338      .call();
4339  }
4340
4341  @Override
4342  public CompletableFuture<Boolean> isSnapshotCleanupEnabled() {
4343    return this.<Boolean> newMasterCaller()
4344      .action((controller, stub) -> this.call(controller, stub,
4345        RequestConverter.buildIsSnapshotCleanupEnabledRequest(),
4346        MasterService.Interface::isSnapshotCleanupEnabled,
4347        IsSnapshotCleanupEnabledResponse::getEnabled))
4348      .call();
4349  }
4350
4351  @Override
4352  public CompletableFuture<Void> moveServersToRSGroup(Set<Address> servers, String groupName) {
4353    return this.<Void> newMasterCaller()
4354      .action((controller, stub) -> this.<MoveServersRequest, MoveServersResponse, Void> call(
4355        controller, stub, RequestConverter.buildMoveServersRequest(servers, groupName),
4356        (s, c, req, done) -> s.moveServers(c, req, done), resp -> null))
4357      .call();
4358  }
4359
4360  @Override
4361  public CompletableFuture<Void> addRSGroup(String groupName) {
4362    return this.<Void> newMasterCaller()
4363      .action((controller, stub) -> this.<AddRSGroupRequest, AddRSGroupResponse, Void> call(
4364        controller, stub, AddRSGroupRequest.newBuilder().setRSGroupName(groupName).build(),
4365        (s, c, req, done) -> s.addRSGroup(c, req, done), resp -> null))
4366      .call();
4367  }
4368
4369  @Override
4370  public CompletableFuture<Void> removeRSGroup(String groupName) {
4371    return this.<Void> newMasterCaller()
4372      .action((controller, stub) -> this.<RemoveRSGroupRequest, RemoveRSGroupResponse, Void> call(
4373        controller, stub, RemoveRSGroupRequest.newBuilder().setRSGroupName(groupName).build(),
4374        (s, c, req, done) -> s.removeRSGroup(c, req, done), resp -> null))
4375      .call();
4376  }
4377
4378  @Override
4379  public CompletableFuture<BalanceResponse> balanceRSGroup(String groupName,
4380    BalanceRequest request) {
4381    return this.<BalanceResponse> newMasterCaller()
4382      .action((controller, stub) -> this.<BalanceRSGroupRequest, BalanceRSGroupResponse,
4383        BalanceResponse> call(controller, stub,
4384          ProtobufUtil.createBalanceRSGroupRequest(groupName, request),
4385          MasterService.Interface::balanceRSGroup, ProtobufUtil::toBalanceResponse))
4386      .call();
4387  }
4388
4389  @Override
4390  public CompletableFuture<List<RSGroupInfo>> listRSGroups() {
4391    return this.<List<RSGroupInfo>> newMasterCaller()
4392      .action((controller, stub) -> this.<ListRSGroupInfosRequest, ListRSGroupInfosResponse,
4393        List<RSGroupInfo>> call(controller, stub, ListRSGroupInfosRequest.getDefaultInstance(),
4394          (s, c, req, done) -> s.listRSGroupInfos(c, req, done), resp -> resp.getRSGroupInfoList()
4395            .stream().map(r -> ProtobufUtil.toGroupInfo(r)).collect(Collectors.toList())))
4396      .call();
4397  }
4398
4399  private CompletableFuture<List<LogEntry>> getSlowLogResponses(
4400    final Map<String, Object> filterParams, final Set<ServerName> serverNames, final int limit,
4401    final String logType) {
4402    if (CollectionUtils.isEmpty(serverNames)) {
4403      return CompletableFuture.completedFuture(Collections.emptyList());
4404    }
4405    return CompletableFuture.supplyAsync(() -> serverNames
4406      .stream().map((ServerName serverName) -> getSlowLogResponseFromServer(serverName,
4407        filterParams, limit, logType))
4408      .map(CompletableFuture::join).flatMap(List::stream).collect(Collectors.toList()));
4409  }
4410
4411  private CompletableFuture<List<LogEntry>> getSlowLogResponseFromServer(ServerName serverName,
4412    Map<String, Object> filterParams, int limit, String logType) {
4413    return this.<List<LogEntry>> newAdminCaller()
4414      .action((controller, stub) -> this.adminCall(controller, stub,
4415        RequestConverter.buildSlowLogResponseRequest(filterParams, limit, logType),
4416        AdminService.Interface::getLogEntries, ProtobufUtil::toSlowLogPayloads))
4417      .serverName(serverName).call();
4418  }
4419
4420  @Override
4421  public CompletableFuture<List<Boolean>>
4422    clearSlowLogResponses(@Nullable Set<ServerName> serverNames) {
4423    if (CollectionUtils.isEmpty(serverNames)) {
4424      return CompletableFuture.completedFuture(Collections.emptyList());
4425    }
4426    List<CompletableFuture<Boolean>> clearSlowLogResponseList =
4427      serverNames.stream().map(this::clearSlowLogsResponses).collect(Collectors.toList());
4428    return convertToFutureOfList(clearSlowLogResponseList);
4429  }
4430
4431  private CompletableFuture<Boolean> clearSlowLogsResponses(final ServerName serverName) {
4432    return this.<Boolean> newAdminCaller()
4433      .action((controller, stub) -> this.adminCall(controller, stub,
4434        RequestConverter.buildClearSlowLogResponseRequest(),
4435        AdminService.Interface::clearSlowLogsResponses, ProtobufUtil::toClearSlowLogPayload))
4436      .serverName(serverName).call();
4437  }
4438
4439  private static <T> CompletableFuture<List<T>>
4440    convertToFutureOfList(List<CompletableFuture<T>> futures) {
4441    CompletableFuture<Void> allDoneFuture =
4442      CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
4443    return allDoneFuture
4444      .thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));
4445  }
4446
4447  @Override
4448  public CompletableFuture<List<TableName>> listTablesInRSGroup(String groupName) {
4449    return this.<List<TableName>> newMasterCaller()
4450      .action((controller, stub) -> this.<ListTablesInRSGroupRequest, ListTablesInRSGroupResponse,
4451        List<TableName>> call(controller, stub,
4452          ListTablesInRSGroupRequest.newBuilder().setGroupName(groupName).build(),
4453          (s, c, req, done) -> s.listTablesInRSGroup(c, req, done), resp -> resp.getTableNameList()
4454            .stream().map(ProtobufUtil::toTableName).collect(Collectors.toList())))
4455      .call();
4456  }
4457
4458  @Override
4459  public CompletableFuture<Pair<List<String>, List<TableName>>>
4460    getConfiguredNamespacesAndTablesInRSGroup(String groupName) {
4461    return this.<Pair<List<String>, List<TableName>>> newMasterCaller()
4462      .action((controller, stub) -> this.<GetConfiguredNamespacesAndTablesInRSGroupRequest,
4463        GetConfiguredNamespacesAndTablesInRSGroupResponse,
4464        Pair<List<String>, List<TableName>>> call(controller, stub,
4465          GetConfiguredNamespacesAndTablesInRSGroupRequest.newBuilder().setGroupName(groupName)
4466            .build(),
4467          (s, c, req, done) -> s.getConfiguredNamespacesAndTablesInRSGroup(c, req, done),
4468          resp -> Pair.newPair(resp.getNamespaceList(), resp.getTableNameList().stream()
4469            .map(ProtobufUtil::toTableName).collect(Collectors.toList()))))
4470      .call();
4471  }
4472
4473  @Override
4474  public CompletableFuture<RSGroupInfo> getRSGroup(Address hostPort) {
4475    return this.<RSGroupInfo> newMasterCaller()
4476      .action((controller, stub) -> this.<GetRSGroupInfoOfServerRequest,
4477        GetRSGroupInfoOfServerResponse, RSGroupInfo> call(controller, stub,
4478          GetRSGroupInfoOfServerRequest.newBuilder()
4479            .setServer(HBaseProtos.ServerName.newBuilder().setHostName(hostPort.getHostname())
4480              .setPort(hostPort.getPort()).build())
4481            .build(),
4482          (s, c, req, done) -> s.getRSGroupInfoOfServer(c, req, done),
4483          resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null))
4484      .call();
4485  }
4486
4487  @Override
4488  public CompletableFuture<Void> removeServersFromRSGroup(Set<Address> servers) {
4489    return this.<Void> newMasterCaller()
4490      .action((controller, stub) -> this.<RemoveServersRequest, RemoveServersResponse, Void> call(
4491        controller, stub, RequestConverter.buildRemoveServersRequest(servers),
4492        (s, c, req, done) -> s.removeServers(c, req, done), resp -> null))
4493      .call();
4494  }
4495
4496  @Override
4497  public CompletableFuture<Void> setRSGroup(Set<TableName> tables, String groupName) {
4498    CompletableFuture<Void> future = new CompletableFuture<>();
4499    for (TableName tableName : tables) {
4500      addListener(tableExists(tableName), (exist, err) -> {
4501        if (err != null) {
4502          future.completeExceptionally(err);
4503          return;
4504        }
4505        if (!exist) {
4506          future.completeExceptionally(new TableNotFoundException(tableName));
4507          return;
4508        }
4509      });
4510    }
4511    addListener(listTableDescriptors(new ArrayList<>(tables)), (tableDescriptions, err) -> {
4512      if (err != null) {
4513        future.completeExceptionally(err);
4514        return;
4515      }
4516      if (tableDescriptions == null || tableDescriptions.isEmpty()) {
4517        future.complete(null);
4518        return;
4519      }
4520      List<TableDescriptor> newTableDescriptors = new ArrayList<>();
4521      for (TableDescriptor td : tableDescriptions) {
4522        newTableDescriptors
4523          .add(TableDescriptorBuilder.newBuilder(td).setRegionServerGroup(groupName).build());
4524      }
4525      addListener(
4526        CompletableFuture.allOf(
4527          newTableDescriptors.stream().map(this::modifyTable).toArray(CompletableFuture[]::new)),
4528        (v, e) -> {
4529          if (e != null) {
4530            future.completeExceptionally(e);
4531          } else {
4532            future.complete(v);
4533          }
4534        });
4535    });
4536    return future;
4537  }
4538
4539  @Override
4540  public CompletableFuture<RSGroupInfo> getRSGroup(TableName table) {
4541    return this.<RSGroupInfo> newMasterCaller()
4542      .action((controller, stub) -> this.<GetRSGroupInfoOfTableRequest,
4543        GetRSGroupInfoOfTableResponse, RSGroupInfo> call(controller, stub,
4544          GetRSGroupInfoOfTableRequest.newBuilder()
4545            .setTableName(ProtobufUtil.toProtoTableName(table)).build(),
4546          (s, c, req, done) -> s.getRSGroupInfoOfTable(c, req, done),
4547          resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null))
4548      .call();
4549  }
4550
4551  @Override
4552  public CompletableFuture<RSGroupInfo> getRSGroup(String groupName) {
4553    return this.<RSGroupInfo> newMasterCaller()
4554      .action((controller, stub) -> this.<GetRSGroupInfoRequest, GetRSGroupInfoResponse,
4555        RSGroupInfo> call(controller, stub,
4556          GetRSGroupInfoRequest.newBuilder().setRSGroupName(groupName).build(),
4557          (s, c, req, done) -> s.getRSGroupInfo(c, req, done),
4558          resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null))
4559      .call();
4560  }
4561
4562  @Override
4563  public CompletableFuture<Void> renameRSGroup(String oldName, String newName) {
4564    return this.<Void> newMasterCaller()
4565      .action((controller, stub) -> this.<RenameRSGroupRequest, RenameRSGroupResponse, Void> call(
4566        controller, stub, RenameRSGroupRequest.newBuilder().setOldRsgroupName(oldName)
4567          .setNewRsgroupName(newName).build(),
4568        (s, c, req, done) -> s.renameRSGroup(c, req, done), resp -> null))
4569      .call();
4570  }
4571
4572  @Override
4573  public CompletableFuture<Void> updateRSGroupConfig(String groupName,
4574    Map<String, String> configuration) {
4575    UpdateRSGroupConfigRequest.Builder request =
4576      UpdateRSGroupConfigRequest.newBuilder().setGroupName(groupName);
4577    if (configuration != null) {
4578      configuration.entrySet().forEach(e -> request.addConfiguration(
4579        NameStringPair.newBuilder().setName(e.getKey()).setValue(e.getValue()).build()));
4580    }
4581    return this.<Void> newMasterCaller()
4582      .action((controller, stub) -> this.<UpdateRSGroupConfigRequest, UpdateRSGroupConfigResponse,
4583        Void> call(controller, stub, request.build(),
4584          (s, c, req, done) -> s.updateRSGroupConfig(c, req, done), resp -> null))
4585      .call();
4586  }
4587
4588  private CompletableFuture<List<LogEntry>> getBalancerDecisions(final int limit) {
4589    return this.<List<LogEntry>> newMasterCaller()
4590      .action((controller, stub) -> this.call(controller, stub,
4591        ProtobufUtil.toBalancerDecisionRequest(limit), MasterService.Interface::getLogEntries,
4592        ProtobufUtil::toBalancerDecisionResponse))
4593      .call();
4594  }
4595
4596  private CompletableFuture<List<LogEntry>> getBalancerRejections(final int limit) {
4597    return this.<List<LogEntry>> newMasterCaller()
4598      .action((controller, stub) -> this.call(controller, stub,
4599        ProtobufUtil.toBalancerRejectionRequest(limit), MasterService.Interface::getLogEntries,
4600        ProtobufUtil::toBalancerRejectionResponse))
4601      .call();
4602  }
4603
4604  @Override
4605  public CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNames,
4606    String logType, ServerType serverType, int limit, Map<String, Object> filterParams) {
4607    if (logType == null || serverType == null) {
4608      throw new IllegalArgumentException("logType and/or serverType cannot be empty");
4609    }
4610    switch (logType) {
4611      case "SLOW_LOG":
4612      case "LARGE_LOG":
4613        if (ServerType.MASTER.equals(serverType)) {
4614          throw new IllegalArgumentException("Slow/Large logs are not maintained by HMaster");
4615        }
4616        return getSlowLogResponses(filterParams, serverNames, limit, logType);
4617      case "BALANCER_DECISION":
4618        if (ServerType.REGION_SERVER.equals(serverType)) {
4619          throw new IllegalArgumentException(
4620            "Balancer Decision logs are not maintained by HRegionServer");
4621        }
4622        return getBalancerDecisions(limit);
4623      case "BALANCER_REJECTION":
4624        if (ServerType.REGION_SERVER.equals(serverType)) {
4625          throw new IllegalArgumentException(
4626            "Balancer Rejection logs are not maintained by HRegionServer");
4627        }
4628        return getBalancerRejections(limit);
4629      default:
4630        return CompletableFuture.completedFuture(Collections.emptyList());
4631    }
4632  }
4633
4634  @Override
4635  public CompletableFuture<Void> flushMasterStore() {
4636    FlushMasterStoreRequest.Builder request = FlushMasterStoreRequest.newBuilder();
4637    return this.<Void> newMasterCaller()
4638      .action((controller, stub) -> this.<FlushMasterStoreRequest, FlushMasterStoreResponse,
4639        Void> call(controller, stub, request.build(),
4640          (s, c, req, done) -> s.flushMasterStore(c, req, done), resp -> null))
4641      .call();
4642  }
4643
4644  @Override
4645  public CompletableFuture<List<String>> getCachedFilesList(ServerName serverName) {
4646    GetCachedFilesListRequest.Builder request = GetCachedFilesListRequest.newBuilder();
4647    return this.<List<String>> newAdminCaller()
4648      .action((controller, stub) -> this.<GetCachedFilesListRequest, GetCachedFilesListResponse,
4649        List<String>> adminCall(controller, stub, request.build(),
4650          (s, c, req, done) -> s.getCachedFilesList(c, req, done),
4651          resp -> resp.getCachedFilesList()))
4652      .serverName(serverName).call();
4653  }
4654
4655  @Override
4656  public CompletableFuture<Void> restoreBackupSystemTable(String snapshotName) {
4657    MasterProtos.RestoreBackupSystemTableRequest request =
4658      MasterProtos.RestoreBackupSystemTableRequest.newBuilder().setSnapshotName(snapshotName)
4659        .build();
4660    return this.<MasterProtos.RestoreBackupSystemTableRequest,
4661      MasterProtos.RestoreBackupSystemTableResponse> procedureCall(request,
4662        MasterService.Interface::restoreBackupSystemTable,
4663        MasterProtos.RestoreBackupSystemTableResponse::getProcId,
4664        new RestoreBackupSystemTableProcedureBiConsumer());
4665  }
4666}