001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.HIGH_QOS;
021import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
022import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
023import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
024
025import edu.umd.cs.findbugs.annotations.Nullable;
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.Collections;
030import java.util.EnumSet;
031import java.util.HashMap;
032import java.util.List;
033import java.util.Map;
034import java.util.Optional;
035import java.util.Set;
036import java.util.concurrent.CompletableFuture;
037import java.util.concurrent.ConcurrentHashMap;
038import java.util.concurrent.ConcurrentLinkedQueue;
039import java.util.concurrent.TimeUnit;
040import java.util.concurrent.atomic.AtomicReference;
041import java.util.function.BiConsumer;
042import java.util.function.Consumer;
043import java.util.function.Function;
044import java.util.function.Supplier;
045import java.util.regex.Pattern;
046import java.util.stream.Collectors;
047import java.util.stream.Stream;
048import org.apache.hadoop.conf.Configuration;
049import org.apache.hadoop.hbase.CacheEvictionStats;
050import org.apache.hadoop.hbase.CacheEvictionStatsAggregator;
051import org.apache.hadoop.hbase.CatalogFamilyFormat;
052import org.apache.hadoop.hbase.ClientMetaTableAccessor;
053import org.apache.hadoop.hbase.ClusterMetrics;
054import org.apache.hadoop.hbase.ClusterMetrics.Option;
055import org.apache.hadoop.hbase.ClusterMetricsBuilder;
056import org.apache.hadoop.hbase.DoNotRetryIOException;
057import org.apache.hadoop.hbase.HConstants;
058import org.apache.hadoop.hbase.HRegionLocation;
059import org.apache.hadoop.hbase.NamespaceDescriptor;
060import org.apache.hadoop.hbase.RegionLocations;
061import org.apache.hadoop.hbase.RegionMetrics;
062import org.apache.hadoop.hbase.RegionMetricsBuilder;
063import org.apache.hadoop.hbase.ServerName;
064import org.apache.hadoop.hbase.TableExistsException;
065import org.apache.hadoop.hbase.TableName;
066import org.apache.hadoop.hbase.TableNotDisabledException;
067import org.apache.hadoop.hbase.TableNotEnabledException;
068import org.apache.hadoop.hbase.TableNotFoundException;
069import org.apache.hadoop.hbase.UnknownRegionException;
070import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder;
071import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
072import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder;
073import org.apache.hadoop.hbase.client.Scan.ReadType;
074import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
075import org.apache.hadoop.hbase.client.replication.TableCFs;
076import org.apache.hadoop.hbase.client.security.SecurityCapability;
077import org.apache.hadoop.hbase.exceptions.DeserializationException;
078import org.apache.hadoop.hbase.ipc.HBaseRpcController;
079import org.apache.hadoop.hbase.net.Address;
080import org.apache.hadoop.hbase.quotas.QuotaFilter;
081import org.apache.hadoop.hbase.quotas.QuotaSettings;
082import org.apache.hadoop.hbase.quotas.QuotaTableUtil;
083import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
084import org.apache.hadoop.hbase.replication.ReplicationException;
085import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
086import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
087import org.apache.hadoop.hbase.replication.SyncReplicationState;
088import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
089import org.apache.hadoop.hbase.security.access.GetUserPermissionsRequest;
090import org.apache.hadoop.hbase.security.access.Permission;
091import org.apache.hadoop.hbase.security.access.ShadedAccessControlUtil;
092import org.apache.hadoop.hbase.security.access.UserPermission;
093import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
094import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
095import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
096import org.apache.hadoop.hbase.util.Bytes;
097import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
098import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
099import org.apache.hadoop.hbase.util.Pair;
100import org.apache.hadoop.hbase.util.Strings;
101import org.apache.yetus.audience.InterfaceAudience;
102import org.slf4j.Logger;
103import org.slf4j.LoggerFactory;
104
105import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
106import org.apache.hbase.thirdparty.com.google.protobuf.Message;
107import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
108import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
109import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
110import org.apache.hbase.thirdparty.io.netty.util.Timeout;
111import org.apache.hbase.thirdparty.io.netty.util.TimerTask;
112import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
113
114import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
115import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
116import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos;
117import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse;
118import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantRequest;
119import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GrantResponse;
120import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsRequest;
121import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsResponse;
122import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeRequest;
123import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeResponse;
124import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
125import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
126import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
127import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
128import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
129import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
130import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
131import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
132import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
133import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
134import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
135import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListRequest;
136import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListResponse;
137import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
138import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
139import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
140import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
141import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
142import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
143import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
144import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
145import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
146import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
147import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
148import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
149import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
150import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
151import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
152import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
153import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
154import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
155import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest;
156import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse;
157import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
158import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse;
159import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest;
160import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse;
161import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest;
162import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersResponse;
163import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest;
164import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse;
165import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
166import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse;
167import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest;
168import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse;
169import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest;
170import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse;
171import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
172import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
173import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
174import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
175import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest;
176import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
177import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
178import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse;
179import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
180import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
181import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
182import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse;
183import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest;
184import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
185import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest;
186import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreResponse;
187import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableRequest;
188import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableResponse;
189import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
190import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
191import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
198import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
199import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest;
200import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresResponse;
201import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
202import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
203import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest;
204import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse;
205import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
206import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
207import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest;
208import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse;
209import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledRequest;
210import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledResponse;
211import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest;
212import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse;
213import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest;
214import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
215import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
216import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
217import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
218import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse;
219import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotCleanupEnabledResponse;
220import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
221import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
222import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest;
223import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse;
224import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
227import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
228import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest;
229import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesResponse;
230import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
231import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
232import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByStateRequest;
233import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByStateResponse;
234import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
235import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse;
236import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByStateRequest;
237import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByStateResponse;
238import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
239import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest;
240import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse;
241import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
242import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest;
243import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse;
244import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest;
245import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse;
246import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerRequest;
247import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerResponse;
248import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
249import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
250import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest;
251import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse;
252import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerRequest;
253import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerResponse;
254import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest;
255import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse;
256import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest;
257import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse;
258import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest;
259import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse;
260import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest;
261import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse;
262import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
263import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
264import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest;
265import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse;
266import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest;
267import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreResponse;
268import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
269import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse;
270import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
271import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
272import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest;
273import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningResponse;
274import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
275import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
276import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
277import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
278import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSnapshotCleanupResponse;
279import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest;
280import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse;
281import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest;
282import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse;
283import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest;
284import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse;
285import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest;
286import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse;
287import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest;
288import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse;
289import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest;
290import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse;
291import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest;
292import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse;
293import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
294import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
295import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
296import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse;
297import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
298import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest;
299import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
300import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
301import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse;
302import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes;
303import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
304import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
305import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupRequest;
306import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.AddRSGroupResponse;
307import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupRequest;
308import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupResponse;
309import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetConfiguredNamespacesAndTablesInRSGroupRequest;
310import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetConfiguredNamespacesAndTablesInRSGroupResponse;
311import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerRequest;
312import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerResponse;
313import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableRequest;
314import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableResponse;
315import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoRequest;
316import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoResponse;
317import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosRequest;
318import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosResponse;
319import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListTablesInRSGroupRequest;
320import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.ListTablesInRSGroupResponse;
321import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersRequest;
322import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.MoveServersResponse;
323import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupRequest;
324import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupResponse;
325import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersRequest;
326import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RemoveServersResponse;
327import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupRequest;
328import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.RenameRSGroupResponse;
329import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.UpdateRSGroupConfigRequest;
330import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.UpdateRSGroupConfigResponse;
331import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
332import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
333import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
334import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
335import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest;
336import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
337import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
338import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
339import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresRequest;
340import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresResponse;
341import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest;
342import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse;
343import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledRequest;
344import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledResponse;
345import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
346import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
347import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
348import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
349import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchRequest;
350import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchResponse;
351import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest;
352import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse;
353import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
354import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
355import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
356
357/**
358 * The implementation of AsyncAdmin.
359 * <p>
360 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will
361 * be finished inside the rpc framework thread, which means that the callbacks registered to the
362 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use
363 * this class should not try to do time consuming tasks in the callbacks.
364 * @since 2.0.0
365 * @see AsyncHBaseAdmin
366 * @see AsyncConnection#getAdmin()
367 * @see AsyncConnection#getAdminBuilder()
368 */
369@InterfaceAudience.Private
370class RawAsyncHBaseAdmin implements AsyncAdmin {
371
372  public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc";
373
374  private static final Logger LOG = LoggerFactory.getLogger(AsyncHBaseAdmin.class);
375
376  private final AsyncConnectionImpl connection;
377
378  private final HashedWheelTimer retryTimer;
379
380  private final AsyncTable<AdvancedScanResultConsumer> metaTable;
381
382  private final long rpcTimeoutNs;
383
384  private final long operationTimeoutNs;
385
386  private final long pauseNs;
387
388  private final long pauseNsForServerOverloaded;
389
390  private final int maxAttempts;
391
392  private final int startLogErrorsCnt;
393
394  private final NonceGenerator ng;
395
396  RawAsyncHBaseAdmin(AsyncConnectionImpl connection, HashedWheelTimer retryTimer,
397    AsyncAdminBuilderBase builder) {
398    this.connection = connection;
399    this.retryTimer = retryTimer;
400    this.metaTable = connection.getTable(META_TABLE_NAME);
401    this.rpcTimeoutNs = builder.rpcTimeoutNs;
402    this.operationTimeoutNs = builder.operationTimeoutNs;
403    this.pauseNs = builder.pauseNs;
404    if (builder.pauseNsForServerOverloaded < builder.pauseNs) {
405      LOG.warn(
406        "Configured value of pauseNsForServerOverloaded is {} ms, which is less than"
407          + " the normal pause value {} ms, use the greater one instead",
408        TimeUnit.NANOSECONDS.toMillis(builder.pauseNsForServerOverloaded),
409        TimeUnit.NANOSECONDS.toMillis(builder.pauseNs));
410      this.pauseNsForServerOverloaded = builder.pauseNs;
411    } else {
412      this.pauseNsForServerOverloaded = builder.pauseNsForServerOverloaded;
413    }
414    this.maxAttempts = builder.maxAttempts;
415    this.startLogErrorsCnt = builder.startLogErrorsCnt;
416    this.ng = connection.getNonceGenerator();
417  }
418
419  <T> MasterRequestCallerBuilder<T> newMasterCaller() {
420    return this.connection.callerFactory.<T> masterRequest()
421      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
422      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
423      .pause(pauseNs, TimeUnit.NANOSECONDS)
424      .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
425      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
426  }
427
428  private <T> AdminRequestCallerBuilder<T> newAdminCaller() {
429    return this.connection.callerFactory.<T> adminRequest()
430      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
431      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
432      .pause(pauseNs, TimeUnit.NANOSECONDS)
433      .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
434      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
435  }
436
437  @FunctionalInterface
438  private interface MasterRpcCall<RESP, REQ> {
439    void call(MasterService.Interface stub, HBaseRpcController controller, REQ req,
440      RpcCallback<RESP> done);
441  }
442
443  @FunctionalInterface
444  private interface AdminRpcCall<RESP, REQ> {
445    void call(AdminService.Interface stub, HBaseRpcController controller, REQ req,
446      RpcCallback<RESP> done);
447  }
448
449  @FunctionalInterface
450  private interface Converter<D, S> {
451    D convert(S src) throws IOException;
452  }
453
454  private <PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller,
455    MasterService.Interface stub, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall,
456    Converter<RESP, PRESP> respConverter) {
457    CompletableFuture<RESP> future = new CompletableFuture<>();
458    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
459
460      @Override
461      public void run(PRESP resp) {
462        if (controller.failed()) {
463          future.completeExceptionally(controller.getFailed());
464        } else {
465          try {
466            future.complete(respConverter.convert(resp));
467          } catch (IOException e) {
468            future.completeExceptionally(e);
469          }
470        }
471      }
472    });
473    return future;
474  }
475
476  private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller,
477    AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall,
478    Converter<RESP, PRESP> respConverter) {
479    CompletableFuture<RESP> future = new CompletableFuture<>();
480    rpcCall.call(stub, controller, preq, new RpcCallback<PRESP>() {
481
482      @Override
483      public void run(PRESP resp) {
484        if (controller.failed()) {
485          future.completeExceptionally(controller.getFailed());
486        } else {
487          try {
488            future.complete(respConverter.convert(resp));
489          } catch (IOException e) {
490            future.completeExceptionally(e);
491          }
492        }
493      }
494    });
495    return future;
496  }
497
498  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq,
499    MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
500    ProcedureBiConsumer consumer) {
501    return procedureCall(b -> {
502    }, preq, rpcCall, respConverter, consumer);
503  }
504
505  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(TableName tableName, PREQ preq,
506    MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
507    ProcedureBiConsumer consumer) {
508    return procedureCall(b -> b.priority(tableName), preq, rpcCall, respConverter, consumer);
509  }
510
511  private <PREQ, PRESP> CompletableFuture<Void> procedureCall(
512    Consumer<MasterRequestCallerBuilder<?>> prioritySetter, PREQ preq,
513    MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
514    ProcedureBiConsumer consumer) {
515    MasterRequestCallerBuilder<Long> builder = this.<Long> newMasterCaller().action((controller,
516      stub) -> this.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall, respConverter));
517    prioritySetter.accept(builder);
518    CompletableFuture<Long> procFuture = builder.call();
519    CompletableFuture<Void> future = waitProcedureResult(procFuture);
520    addListener(future, consumer);
521    return future;
522  }
523
524  @Override
525  public CompletableFuture<Boolean> tableExists(TableName tableName) {
526    if (TableName.isMetaTableName(tableName)) {
527      return CompletableFuture.completedFuture(true);
528    }
529    return ClientMetaTableAccessor.tableExists(metaTable, tableName);
530  }
531
532  @Override
533  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(boolean includeSysTables) {
534    return getTableDescriptors(
535      RequestConverter.buildGetTableDescriptorsRequest(null, includeSysTables));
536  }
537
538  /**
539   * {@link #listTableDescriptors(boolean)}
540   */
541  @Override
542  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(Pattern pattern,
543    boolean includeSysTables) {
544    Preconditions.checkNotNull(pattern, "pattern is null. If you don't specify a pattern, "
545      + "use listTableDescriptors(boolean) instead");
546    return getTableDescriptors(
547      RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables));
548  }
549
550  @Override
551  public CompletableFuture<List<TableDescriptor>> listTableDescriptors(List<TableName> tableNames) {
552    Preconditions.checkNotNull(tableNames, "tableNames is null. If you don't specify tableNames, "
553      + "use listTableDescriptors(boolean) instead");
554    if (tableNames.isEmpty()) {
555      return CompletableFuture.completedFuture(Collections.emptyList());
556    }
557    return getTableDescriptors(RequestConverter.buildGetTableDescriptorsRequest(tableNames));
558  }
559
560  private CompletableFuture<List<TableDescriptor>>
561    getTableDescriptors(GetTableDescriptorsRequest request) {
562    return this.<List<TableDescriptor>> newMasterCaller()
563      .action((controller, stub) -> this.<GetTableDescriptorsRequest, GetTableDescriptorsResponse,
564        List<TableDescriptor>> call(controller, stub, request,
565          (s, c, req, done) -> s.getTableDescriptors(c, req, done),
566          (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
567      .call();
568  }
569
570  @Override
571  public CompletableFuture<List<TableName>> listTableNames(boolean includeSysTables) {
572    return getTableNames(RequestConverter.buildGetTableNamesRequest(null, includeSysTables));
573  }
574
575  @Override
576  public CompletableFuture<List<TableName>> listTableNames(Pattern pattern,
577    boolean includeSysTables) {
578    Preconditions.checkNotNull(pattern,
579      "pattern is null. If you don't specify a pattern, use listTableNames(boolean) instead");
580    return getTableNames(RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables));
581  }
582
583  @Override
584  public CompletableFuture<List<TableName>> listTableNamesByState(boolean isEnabled) {
585    return this.<List<TableName>> newMasterCaller()
586      .action((controller, stub) -> this.<ListTableNamesByStateRequest,
587        ListTableNamesByStateResponse, List<TableName>> call(controller, stub,
588          ListTableNamesByStateRequest.newBuilder().setIsEnabled(isEnabled).build(),
589          (s, c, req, done) -> s.listTableNamesByState(c, req, done),
590          (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList())))
591      .call();
592  }
593
594  private CompletableFuture<List<TableName>> getTableNames(GetTableNamesRequest request) {
595    return this.<List<TableName>> newMasterCaller()
596      .action((controller, stub) -> this.<GetTableNamesRequest, GetTableNamesResponse,
597        List<TableName>> call(controller, stub, request,
598          (s, c, req, done) -> s.getTableNames(c, req, done),
599          (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList())))
600      .call();
601  }
602
603  @Override
604  public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByNamespace(String name) {
605    return this.<List<TableDescriptor>> newMasterCaller()
606      .action((controller, stub) -> this.<ListTableDescriptorsByNamespaceRequest,
607        ListTableDescriptorsByNamespaceResponse, List<TableDescriptor>> call(controller, stub,
608          ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
609          (s, c, req, done) -> s.listTableDescriptorsByNamespace(c, req, done),
610          (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
611      .call();
612  }
613
614  @Override
615  public CompletableFuture<List<TableDescriptor>> listTableDescriptorsByState(boolean isEnabled) {
616    return this.<List<TableDescriptor>> newMasterCaller()
617      .action((controller, stub) -> this.<ListTableDescriptorsByStateRequest,
618        ListTableDescriptorsByStateResponse, List<TableDescriptor>> call(controller, stub,
619          ListTableDescriptorsByStateRequest.newBuilder().setIsEnabled(isEnabled).build(),
620          (s, c, req, done) -> s.listTableDescriptorsByState(c, req, done),
621          (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
622      .call();
623  }
624
625  @Override
626  public CompletableFuture<List<TableName>> listTableNamesByNamespace(String name) {
627    return this.<List<TableName>> newMasterCaller()
628      .action((controller, stub) -> this.<ListTableNamesByNamespaceRequest,
629        ListTableNamesByNamespaceResponse, List<TableName>> call(controller, stub,
630          ListTableNamesByNamespaceRequest.newBuilder().setNamespaceName(name).build(),
631          (s, c, req, done) -> s.listTableNamesByNamespace(c, req, done),
632          (resp) -> ProtobufUtil.toTableNameList(resp.getTableNameList())))
633      .call();
634  }
635
636  @Override
637  public CompletableFuture<TableDescriptor> getDescriptor(TableName tableName) {
638    CompletableFuture<TableDescriptor> future = new CompletableFuture<>();
639    addListener(this.<List<TableSchema>> newMasterCaller().priority(tableName)
640      .action((controller, stub) -> this.<GetTableDescriptorsRequest, GetTableDescriptorsResponse,
641        List<TableSchema>> call(controller, stub,
642          RequestConverter.buildGetTableDescriptorsRequest(tableName),
643          (s, c, req, done) -> s.getTableDescriptors(c, req, done),
644          (resp) -> resp.getTableSchemaList()))
645      .call(), (tableSchemas, error) -> {
646        if (error != null) {
647          future.completeExceptionally(error);
648          return;
649        }
650        if (!tableSchemas.isEmpty()) {
651          future.complete(ProtobufUtil.toTableDescriptor(tableSchemas.get(0)));
652        } else {
653          future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString()));
654        }
655      });
656    return future;
657  }
658
659  @Override
660  public CompletableFuture<Void> createTable(TableDescriptor desc) {
661    return createTable(desc.getTableName(),
662      RequestConverter.buildCreateTableRequest(desc, null, ng.getNonceGroup(), ng.newNonce()));
663  }
664
665  @Override
666  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[] startKey, byte[] endKey,
667    int numRegions) {
668    try {
669      return createTable(desc, getSplitKeys(startKey, endKey, numRegions));
670    } catch (IllegalArgumentException e) {
671      return failedFuture(e);
672    }
673  }
674
675  @Override
676  public CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys) {
677    Preconditions.checkNotNull(splitKeys, "splitKeys is null. If you don't specify splitKeys,"
678      + " use createTable(TableDescriptor) instead");
679    try {
680      verifySplitKeys(splitKeys);
681      return createTable(desc.getTableName(), RequestConverter.buildCreateTableRequest(desc,
682        splitKeys, ng.getNonceGroup(), ng.newNonce()));
683    } catch (IllegalArgumentException e) {
684      return failedFuture(e);
685    }
686  }
687
688  private CompletableFuture<Void> createTable(TableName tableName, CreateTableRequest request) {
689    Preconditions.checkNotNull(tableName, "table name is null");
690    return this.<CreateTableRequest, CreateTableResponse> procedureCall(tableName, request,
691      (s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(),
692      new CreateTableProcedureBiConsumer(tableName));
693  }
694
695  @Override
696  public CompletableFuture<Void> modifyTable(TableDescriptor desc) {
697    return modifyTable(desc, true);
698  }
699
700  @Override
701  public CompletableFuture<Void> modifyTable(TableDescriptor desc, boolean reopenRegions) {
702    return this.<ModifyTableRequest, ModifyTableResponse> procedureCall(desc.getTableName(),
703      RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(),
704        ng.newNonce(), reopenRegions),
705      (s, c, req, done) -> s.modifyTable(c, req, done), (resp) -> resp.getProcId(),
706      new ModifyTableProcedureBiConsumer(this, desc.getTableName()));
707  }
708
709  @Override
710  public CompletableFuture<Void> modifyTableStoreFileTracker(TableName tableName, String dstSFT) {
711    return this.<ModifyTableStoreFileTrackerRequest,
712      ModifyTableStoreFileTrackerResponse> procedureCall(tableName,
713        RequestConverter.buildModifyTableStoreFileTrackerRequest(tableName, dstSFT,
714          ng.getNonceGroup(), ng.newNonce()),
715        (s, c, req, done) -> s.modifyTableStoreFileTracker(c, req, done),
716        (resp) -> resp.getProcId(),
717        new ModifyTableStoreFileTrackerProcedureBiConsumer(this, tableName));
718  }
719
720  @Override
721  public CompletableFuture<Void> deleteTable(TableName tableName) {
722    return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(tableName,
723      RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
724      (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(),
725      new DeleteTableProcedureBiConsumer(tableName));
726  }
727
728  @Override
729  public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) {
730    return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(tableName,
731      RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(),
732        ng.newNonce()),
733      (s, c, req, done) -> s.truncateTable(c, req, done), (resp) -> resp.getProcId(),
734      new TruncateTableProcedureBiConsumer(tableName));
735  }
736
737  @Override
738  public CompletableFuture<Void> enableTable(TableName tableName) {
739    return this.<EnableTableRequest, EnableTableResponse> procedureCall(tableName,
740      RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
741      (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(),
742      new EnableTableProcedureBiConsumer(tableName));
743  }
744
745  @Override
746  public CompletableFuture<Void> disableTable(TableName tableName) {
747    return this.<DisableTableRequest, DisableTableResponse> procedureCall(tableName,
748      RequestConverter.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
749      (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(),
750      new DisableTableProcedureBiConsumer(tableName));
751  }
752
753  /**
754   * Utility for completing passed TableState {@link CompletableFuture} <code>future</code> using
755   * passed parameters. Sets error or boolean result ('true' if table matches the passed-in
756   * targetState).
757   */
758  private static CompletableFuture<Boolean> completeCheckTableState(
759    CompletableFuture<Boolean> future, TableState tableState, Throwable error,
760    TableState.State targetState, TableName tableName) {
761    if (error != null) {
762      future.completeExceptionally(error);
763    } else {
764      if (tableState != null) {
765        future.complete(tableState.inStates(targetState));
766      } else {
767        future.completeExceptionally(new TableNotFoundException(tableName));
768      }
769    }
770    return future;
771  }
772
773  @Override
774  public CompletableFuture<Boolean> isTableEnabled(TableName tableName) {
775    if (TableName.isMetaTableName(tableName)) {
776      return CompletableFuture.completedFuture(true);
777    }
778    CompletableFuture<Boolean> future = new CompletableFuture<>();
779    addListener(ClientMetaTableAccessor.getTableState(metaTable, tableName),
780      (tableState, error) -> {
781        completeCheckTableState(future, tableState.isPresent() ? tableState.get() : null, error,
782          TableState.State.ENABLED, tableName);
783      });
784    return future;
785  }
786
787  @Override
788  public CompletableFuture<Boolean> isTableDisabled(TableName tableName) {
789    if (TableName.isMetaTableName(tableName)) {
790      return CompletableFuture.completedFuture(false);
791    }
792    CompletableFuture<Boolean> future = new CompletableFuture<>();
793    addListener(ClientMetaTableAccessor.getTableState(metaTable, tableName),
794      (tableState, error) -> {
795        completeCheckTableState(future, tableState.isPresent() ? tableState.get() : null, error,
796          TableState.State.DISABLED, tableName);
797      });
798    return future;
799  }
800
801  @Override
802  public CompletableFuture<Boolean> isTableAvailable(TableName tableName) {
803    if (TableName.isMetaTableName(tableName)) {
804      return connection.registry.getMetaRegionLocations().thenApply(locs -> Stream
805        .of(locs.getRegionLocations()).allMatch(loc -> loc != null && loc.getServerName() != null));
806    }
807    CompletableFuture<Boolean> future = new CompletableFuture<>();
808    addListener(isTableEnabled(tableName), (enabled, error) -> {
809      if (error != null) {
810        if (error instanceof TableNotFoundException) {
811          future.complete(false);
812        } else {
813          future.completeExceptionally(error);
814        }
815        return;
816      }
817      if (!enabled) {
818        future.complete(false);
819      } else {
820        addListener(ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName),
821          (locations, error1) -> {
822            if (error1 != null) {
823              future.completeExceptionally(error1);
824              return;
825            }
826            List<HRegionLocation> notDeployedRegions = locations.stream()
827              .filter(loc -> loc.getServerName() == null).collect(Collectors.toList());
828            if (notDeployedRegions.size() > 0) {
829              if (LOG.isDebugEnabled()) {
830                LOG.debug("Table " + tableName + " has " + notDeployedRegions.size() + " regions");
831              }
832              future.complete(false);
833              return;
834            }
835            future.complete(true);
836          });
837      }
838    });
839    return future;
840  }
841
842  @Override
843  public CompletableFuture<Void> addColumnFamily(TableName tableName,
844    ColumnFamilyDescriptor columnFamily) {
845    return this.<AddColumnRequest, AddColumnResponse> procedureCall(tableName,
846      RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
847        ng.newNonce()),
848      (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(),
849      new AddColumnFamilyProcedureBiConsumer(tableName));
850  }
851
852  @Override
853  public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) {
854    return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(tableName,
855      RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
856        ng.newNonce()),
857      (s, c, req, done) -> s.deleteColumn(c, req, done), (resp) -> resp.getProcId(),
858      new DeleteColumnFamilyProcedureBiConsumer(tableName));
859  }
860
861  @Override
862  public CompletableFuture<Void> modifyColumnFamily(TableName tableName,
863    ColumnFamilyDescriptor columnFamily) {
864    return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(tableName,
865      RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
866        ng.newNonce()),
867      (s, c, req, done) -> s.modifyColumn(c, req, done), (resp) -> resp.getProcId(),
868      new ModifyColumnFamilyProcedureBiConsumer(tableName));
869  }
870
871  @Override
872  public CompletableFuture<Void> modifyColumnFamilyStoreFileTracker(TableName tableName,
873    byte[] family, String dstSFT) {
874    return this.<ModifyColumnStoreFileTrackerRequest,
875      ModifyColumnStoreFileTrackerResponse> procedureCall(tableName,
876        RequestConverter.buildModifyColumnStoreFileTrackerRequest(tableName, family, dstSFT,
877          ng.getNonceGroup(), ng.newNonce()),
878        (s, c, req, done) -> s.modifyColumnStoreFileTracker(c, req, done),
879        (resp) -> resp.getProcId(),
880        new ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer(tableName));
881  }
882
883  @Override
884  public CompletableFuture<Void> createNamespace(NamespaceDescriptor descriptor) {
885    return this.<CreateNamespaceRequest, CreateNamespaceResponse> procedureCall(
886      RequestConverter.buildCreateNamespaceRequest(descriptor),
887      (s, c, req, done) -> s.createNamespace(c, req, done), (resp) -> resp.getProcId(),
888      new CreateNamespaceProcedureBiConsumer(descriptor.getName()));
889  }
890
891  @Override
892  public CompletableFuture<Void> modifyNamespace(NamespaceDescriptor descriptor) {
893    return this.<ModifyNamespaceRequest, ModifyNamespaceResponse> procedureCall(
894      RequestConverter.buildModifyNamespaceRequest(descriptor),
895      (s, c, req, done) -> s.modifyNamespace(c, req, done), (resp) -> resp.getProcId(),
896      new ModifyNamespaceProcedureBiConsumer(descriptor.getName()));
897  }
898
899  @Override
900  public CompletableFuture<Void> deleteNamespace(String name) {
901    return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall(
902      RequestConverter.buildDeleteNamespaceRequest(name),
903      (s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(),
904      new DeleteNamespaceProcedureBiConsumer(name));
905  }
906
907  @Override
908  public CompletableFuture<NamespaceDescriptor> getNamespaceDescriptor(String name) {
909    return this.<NamespaceDescriptor> newMasterCaller()
910      .action((controller, stub) -> this.<GetNamespaceDescriptorRequest,
911        GetNamespaceDescriptorResponse, NamespaceDescriptor> call(controller, stub,
912          RequestConverter.buildGetNamespaceDescriptorRequest(name),
913          (s, c, req, done) -> s.getNamespaceDescriptor(c, req, done),
914          (resp) -> ProtobufUtil.toNamespaceDescriptor(resp.getNamespaceDescriptor())))
915      .call();
916  }
917
918  @Override
919  public CompletableFuture<List<String>> listNamespaces() {
920    return this.<List<String>> newMasterCaller()
921      .action((controller, stub) -> this.<ListNamespacesRequest, ListNamespacesResponse,
922        List<String>> call(controller, stub, ListNamespacesRequest.newBuilder().build(),
923          (s, c, req, done) -> s.listNamespaces(c, req, done),
924          (resp) -> resp.getNamespaceNameList()))
925      .call();
926  }
927
928  @Override
929  public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() {
930    return this.<List<NamespaceDescriptor>> newMasterCaller()
931      .action((controller, stub) -> this.<ListNamespaceDescriptorsRequest,
932        ListNamespaceDescriptorsResponse, List<NamespaceDescriptor>> call(controller, stub,
933          ListNamespaceDescriptorsRequest.newBuilder().build(),
934          (s, c, req, done) -> s.listNamespaceDescriptors(c, req, done),
935          (resp) -> ProtobufUtil.toNamespaceDescriptorList(resp)))
936      .call();
937  }
938
939  @Override
940  public CompletableFuture<List<RegionInfo>> getRegions(ServerName serverName) {
941    return this.<List<RegionInfo>> newAdminCaller()
942      .action((controller, stub) -> this.<GetOnlineRegionRequest, GetOnlineRegionResponse,
943        List<RegionInfo>> adminCall(controller, stub,
944          RequestConverter.buildGetOnlineRegionRequest(),
945          (s, c, req, done) -> s.getOnlineRegion(c, req, done),
946          resp -> ProtobufUtil.getRegionInfos(resp)))
947      .serverName(serverName).call();
948  }
949
950  @Override
951  public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) {
952    if (tableName.equals(META_TABLE_NAME)) {
953      return connection.registry.getMetaRegionLocations()
954        .thenApply(locs -> Stream.of(locs.getRegionLocations()).map(HRegionLocation::getRegion)
955          .collect(Collectors.toList()));
956    } else {
957      return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName).thenApply(
958        locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList()));
959    }
960  }
961
962  @Override
963  public CompletableFuture<Void> flush(TableName tableName) {
964    return flush(tableName, Collections.emptyList());
965  }
966
967  @Override
968  public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) {
969    return flush(tableName, Collections.singletonList(columnFamily));
970  }
971
972  @Override
973  public CompletableFuture<Void> flush(TableName tableName, List<byte[]> columnFamilyList) {
974    // This is for keeping compatibility with old implementation.
975    // If the server version is lower than the client version, it's possible that the
976    // flushTable method is not present in the server side, if so, we need to fall back
977    // to the old implementation.
978    List<byte[]> columnFamilies = columnFamilyList.stream()
979      .filter(cf -> cf != null && cf.length > 0).distinct().collect(Collectors.toList());
980    FlushTableRequest request = RequestConverter.buildFlushTableRequest(tableName, columnFamilies,
981      ng.getNonceGroup(), ng.newNonce());
982    CompletableFuture<Void> procFuture = this.<FlushTableRequest, FlushTableResponse> procedureCall(
983      tableName, request, (s, c, req, done) -> s.flushTable(c, req, done),
984      (resp) -> resp.getProcId(), new FlushTableProcedureBiConsumer(tableName));
985    CompletableFuture<Void> future = new CompletableFuture<>();
986    addListener(procFuture, (ret, error) -> {
987      if (error != null) {
988        if (error instanceof TableNotFoundException || error instanceof TableNotEnabledException) {
989          future.completeExceptionally(error);
990        } else if (error instanceof DoNotRetryIOException) {
991          // usually this is caused by the method is not present on the server or
992          // the hbase hadoop version does not match the running hadoop version.
993          // if that happens, we need fall back to the old flush implementation.
994          LOG.info("Unrecoverable error in master side. Fallback to FlushTableProcedure V1", error);
995          legacyFlush(future, tableName, columnFamilies);
996        } else {
997          future.completeExceptionally(error);
998        }
999      } else {
1000        future.complete(ret);
1001      }
1002    });
1003    return future;
1004  }
1005
1006  private void legacyFlush(CompletableFuture<Void> future, TableName tableName,
1007    List<byte[]> columnFamilies) {
1008    addListener(tableExists(tableName), (exists, err) -> {
1009      if (err != null) {
1010        future.completeExceptionally(err);
1011      } else if (!exists) {
1012        future.completeExceptionally(new TableNotFoundException(tableName));
1013      } else {
1014        addListener(isTableEnabled(tableName), (tableEnabled, err2) -> {
1015          if (err2 != null) {
1016            future.completeExceptionally(err2);
1017          } else if (!tableEnabled) {
1018            future.completeExceptionally(new TableNotEnabledException(tableName));
1019          } else {
1020            Map<String, String> props = new HashMap<>();
1021            if (columnFamilies != null && !columnFamilies.isEmpty()) {
1022              props.put(HConstants.FAMILY_KEY_STR, Strings.JOINER
1023                .join(columnFamilies.stream().map(Bytes::toString).collect(Collectors.toList())));
1024            }
1025            addListener(
1026              execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(), props),
1027              (ret, err3) -> {
1028                if (err3 != null) {
1029                  future.completeExceptionally(err3);
1030                } else {
1031                  future.complete(ret);
1032                }
1033              });
1034          }
1035        });
1036      }
1037    });
1038  }
1039
1040  @Override
1041  public CompletableFuture<Void> flushRegion(byte[] regionName) {
1042    return flushRegionInternal(regionName, null, false).thenAccept(r -> {
1043    });
1044  }
1045
1046  @Override
1047  public CompletableFuture<Void> flushRegion(byte[] regionName, byte[] columnFamily) {
1048    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1049      + "If you don't specify a columnFamily, use flushRegion(regionName) instead");
1050    return flushRegionInternal(regionName, columnFamily, false).thenAccept(r -> {
1051    });
1052  }
1053
1054  /**
1055   * This method is for internal use only, where we need the response of the flush.
1056   * <p/>
1057   * As it exposes the protobuf message, please do <strong>NOT</strong> try to expose it as a public
1058   * API.
1059   */
1060  CompletableFuture<FlushRegionResponse> flushRegionInternal(byte[] regionName, byte[] columnFamily,
1061    boolean writeFlushWALMarker) {
1062    CompletableFuture<FlushRegionResponse> future = new CompletableFuture<>();
1063    addListener(getRegionLocation(regionName), (location, err) -> {
1064      if (err != null) {
1065        future.completeExceptionally(err);
1066        return;
1067      }
1068      ServerName serverName = location.getServerName();
1069      if (serverName == null) {
1070        future
1071          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1072        return;
1073      }
1074      addListener(flush(serverName, location.getRegion(), columnFamily, writeFlushWALMarker),
1075        (ret, err2) -> {
1076          if (err2 != null) {
1077            future.completeExceptionally(err2);
1078          } else {
1079            future.complete(ret);
1080          }
1081        });
1082    });
1083    return future;
1084  }
1085
1086  private CompletableFuture<FlushRegionResponse> flush(ServerName serverName, RegionInfo regionInfo,
1087    byte[] columnFamily, boolean writeFlushWALMarker) {
1088    return this.<FlushRegionResponse> newAdminCaller().serverName(serverName)
1089      .action((controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse,
1090        FlushRegionResponse> adminCall(controller, stub,
1091          RequestConverter.buildFlushRegionRequest(regionInfo.getRegionName(), columnFamily,
1092            writeFlushWALMarker),
1093          (s, c, req, done) -> s.flushRegion(c, req, done), resp -> resp))
1094      .call();
1095  }
1096
1097  @Override
1098  public CompletableFuture<Void> flushRegionServer(ServerName sn) {
1099    CompletableFuture<Void> future = new CompletableFuture<>();
1100    addListener(getRegions(sn), (hRegionInfos, err) -> {
1101      if (err != null) {
1102        future.completeExceptionally(err);
1103        return;
1104      }
1105      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
1106      if (hRegionInfos != null) {
1107        hRegionInfos
1108          .forEach(region -> compactFutures.add(flush(sn, region, null, false).thenAccept(r -> {
1109          })));
1110      }
1111      addListener(CompletableFuture.allOf(
1112        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
1113          if (err2 != null) {
1114            future.completeExceptionally(err2);
1115          } else {
1116            future.complete(ret);
1117          }
1118        });
1119    });
1120    return future;
1121  }
1122
1123  @Override
1124  public CompletableFuture<Void> compact(TableName tableName, CompactType compactType) {
1125    return compact(tableName, null, false, compactType);
1126  }
1127
1128  @Override
1129  public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily,
1130    CompactType compactType) {
1131    Preconditions.checkNotNull(columnFamily, "columnFamily is null. "
1132      + "If you don't specify a columnFamily, use compact(TableName) instead");
1133    return compact(tableName, columnFamily, false, compactType);
1134  }
1135
1136  @Override
1137  public CompletableFuture<Void> compactRegion(byte[] regionName) {
1138    return compactRegion(regionName, null, false);
1139  }
1140
1141  @Override
1142  public CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily) {
1143    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1144      + " If you don't specify a columnFamily, use compactRegion(regionName) instead");
1145    return compactRegion(regionName, columnFamily, false);
1146  }
1147
1148  @Override
1149  public CompletableFuture<Void> majorCompact(TableName tableName, CompactType compactType) {
1150    return compact(tableName, null, true, compactType);
1151  }
1152
1153  @Override
1154  public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily,
1155    CompactType compactType) {
1156    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1157      + "If you don't specify a columnFamily, use compact(TableName) instead");
1158    return compact(tableName, columnFamily, true, compactType);
1159  }
1160
1161  @Override
1162  public CompletableFuture<Void> majorCompactRegion(byte[] regionName) {
1163    return compactRegion(regionName, null, true);
1164  }
1165
1166  @Override
1167  public CompletableFuture<Void> majorCompactRegion(byte[] regionName, byte[] columnFamily) {
1168    Preconditions.checkNotNull(columnFamily, "columnFamily is null."
1169      + " If you don't specify a columnFamily, use majorCompactRegion(regionName) instead");
1170    return compactRegion(regionName, columnFamily, true);
1171  }
1172
1173  @Override
1174  public CompletableFuture<Void> compactRegionServer(ServerName sn) {
1175    return compactRegionServer(sn, false);
1176  }
1177
1178  @Override
1179  public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) {
1180    return compactRegionServer(sn, true);
1181  }
1182
1183  private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) {
1184    CompletableFuture<Void> future = new CompletableFuture<>();
1185    addListener(getRegions(sn), (hRegionInfos, err) -> {
1186      if (err != null) {
1187        future.completeExceptionally(err);
1188        return;
1189      }
1190      List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
1191      if (hRegionInfos != null) {
1192        hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, null)));
1193      }
1194      addListener(CompletableFuture.allOf(
1195        compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
1196          if (err2 != null) {
1197            future.completeExceptionally(err2);
1198          } else {
1199            future.complete(ret);
1200          }
1201        });
1202    });
1203    return future;
1204  }
1205
1206  private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily,
1207    boolean major) {
1208    CompletableFuture<Void> future = new CompletableFuture<>();
1209    addListener(getRegionLocation(regionName), (location, err) -> {
1210      if (err != null) {
1211        future.completeExceptionally(err);
1212        return;
1213      }
1214      ServerName serverName = location.getServerName();
1215      if (serverName == null) {
1216        future
1217          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1218        return;
1219      }
1220      addListener(compact(location.getServerName(), location.getRegion(), major, columnFamily),
1221        (ret, err2) -> {
1222          if (err2 != null) {
1223            future.completeExceptionally(err2);
1224          } else {
1225            future.complete(ret);
1226          }
1227        });
1228    });
1229    return future;
1230  }
1231
1232  /**
1233   * List all region locations for the specific table.
1234   */
1235  private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) {
1236    if (TableName.META_TABLE_NAME.equals(tableName)) {
1237      CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
1238      addListener(connection.registry.getMetaRegionLocations(), (metaRegions, err) -> {
1239        if (err != null) {
1240          future.completeExceptionally(err);
1241        } else if (
1242          metaRegions == null || metaRegions.isEmpty()
1243            || metaRegions.getDefaultRegionLocation() == null
1244        ) {
1245          future.completeExceptionally(new IOException("meta region does not found"));
1246        } else {
1247          future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
1248        }
1249      });
1250      return future;
1251    } else {
1252      // For non-meta table, we fetch all locations by scanning hbase:meta table
1253      return ClientMetaTableAccessor.getTableHRegionLocations(metaTable, tableName);
1254    }
1255  }
1256
1257  /**
1258   * Compact column family of a table, Asynchronous operation even if CompletableFuture.get()
1259   */
1260  private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, boolean major,
1261    CompactType compactType) {
1262    CompletableFuture<Void> future = new CompletableFuture<>();
1263
1264    switch (compactType) {
1265      case MOB:
1266        addListener(connection.registry.getActiveMaster(), (serverName, err) -> {
1267          if (err != null) {
1268            future.completeExceptionally(err);
1269            return;
1270          }
1271          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
1272          addListener(compact(serverName, regionInfo, major, columnFamily), (ret, err2) -> {
1273            if (err2 != null) {
1274              future.completeExceptionally(err2);
1275            } else {
1276              future.complete(ret);
1277            }
1278          });
1279        });
1280        break;
1281      case NORMAL:
1282        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
1283          if (err != null) {
1284            future.completeExceptionally(err);
1285            return;
1286          }
1287          if (locations == null || locations.isEmpty()) {
1288            future.completeExceptionally(new TableNotFoundException(tableName));
1289          }
1290          CompletableFuture<?>[] compactFutures =
1291            locations.stream().filter(l -> l.getRegion() != null)
1292              .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
1293              .map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily))
1294              .toArray(CompletableFuture<?>[]::new);
1295          // future complete unless all of the compact futures are completed.
1296          addListener(CompletableFuture.allOf(compactFutures), (ret, err2) -> {
1297            if (err2 != null) {
1298              future.completeExceptionally(err2);
1299            } else {
1300              future.complete(ret);
1301            }
1302          });
1303        });
1304        break;
1305      default:
1306        throw new IllegalArgumentException("Unknown compactType: " + compactType);
1307    }
1308    return future;
1309  }
1310
1311  /**
1312   * Compact the region at specific region server.
1313   */
1314  private CompletableFuture<Void> compact(final ServerName sn, final RegionInfo hri,
1315    final boolean major, byte[] columnFamily) {
1316    return this.<Void> newAdminCaller().serverName(sn)
1317      .action((controller, stub) -> this.<CompactRegionRequest, CompactRegionResponse,
1318        Void> adminCall(controller, stub,
1319          RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, columnFamily),
1320          (s, c, req, done) -> s.compactRegion(c, req, done), resp -> null))
1321      .call();
1322  }
1323
1324  private byte[] toEncodeRegionName(byte[] regionName) {
1325    return RegionInfo.isEncodedRegionName(regionName)
1326      ? regionName
1327      : Bytes.toBytes(RegionInfo.encodeRegionName(regionName));
1328  }
1329
1330  private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName,
1331    CompletableFuture<TableName> result) {
1332    addListener(getRegionLocation(encodeRegionName), (location, err) -> {
1333      if (err != null) {
1334        result.completeExceptionally(err);
1335        return;
1336      }
1337      RegionInfo regionInfo = location.getRegion();
1338      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1339        result.completeExceptionally(
1340          new IllegalArgumentException("Can't invoke merge on non-default regions directly"));
1341        return;
1342      }
1343      if (!tableName.compareAndSet(null, regionInfo.getTable())) {
1344        if (!tableName.get().equals(regionInfo.getTable())) {
1345          // tables of this two region should be same.
1346          result.completeExceptionally(
1347            new IllegalArgumentException("Cannot merge regions from two different tables "
1348              + tableName.get() + " and " + regionInfo.getTable()));
1349        } else {
1350          result.complete(tableName.get());
1351        }
1352      }
1353    });
1354  }
1355
1356  private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[][] encodedRegionNames) {
1357    AtomicReference<TableName> tableNameRef = new AtomicReference<>();
1358    CompletableFuture<TableName> future = new CompletableFuture<>();
1359    for (byte[] encodedRegionName : encodedRegionNames) {
1360      checkAndGetTableName(encodedRegionName, tableNameRef, future);
1361    }
1362    return future;
1363  }
1364
1365  @Override
1366  public CompletableFuture<Boolean> mergeSwitch(boolean enabled, boolean drainMerges) {
1367    return setSplitOrMergeOn(enabled, drainMerges, MasterSwitchType.MERGE);
1368  }
1369
1370  @Override
1371  public CompletableFuture<Boolean> isMergeEnabled() {
1372    return isSplitOrMergeOn(MasterSwitchType.MERGE);
1373  }
1374
1375  @Override
1376  public CompletableFuture<Boolean> splitSwitch(boolean enabled, boolean drainSplits) {
1377    return setSplitOrMergeOn(enabled, drainSplits, MasterSwitchType.SPLIT);
1378  }
1379
1380  @Override
1381  public CompletableFuture<Boolean> isSplitEnabled() {
1382    return isSplitOrMergeOn(MasterSwitchType.SPLIT);
1383  }
1384
1385  private CompletableFuture<Boolean> setSplitOrMergeOn(boolean enabled, boolean synchronous,
1386    MasterSwitchType switchType) {
1387    SetSplitOrMergeEnabledRequest request =
1388      RequestConverter.buildSetSplitOrMergeEnabledRequest(enabled, synchronous, switchType);
1389    return this.<Boolean> newMasterCaller()
1390      .action((controller, stub) -> this.<SetSplitOrMergeEnabledRequest,
1391        SetSplitOrMergeEnabledResponse, Boolean> call(controller, stub, request,
1392          (s, c, req, done) -> s.setSplitOrMergeEnabled(c, req, done),
1393          (resp) -> resp.getPrevValueList().get(0)))
1394      .call();
1395  }
1396
1397  private CompletableFuture<Boolean> isSplitOrMergeOn(MasterSwitchType switchType) {
1398    IsSplitOrMergeEnabledRequest request =
1399      RequestConverter.buildIsSplitOrMergeEnabledRequest(switchType);
1400    return this.<Boolean> newMasterCaller()
1401      .action((controller, stub) -> this.<IsSplitOrMergeEnabledRequest,
1402        IsSplitOrMergeEnabledResponse, Boolean> call(controller, stub, request,
1403          (s, c, req, done) -> s.isSplitOrMergeEnabled(c, req, done), (resp) -> resp.getEnabled()))
1404      .call();
1405  }
1406
1407  @Override
1408  public CompletableFuture<Void> mergeRegions(List<byte[]> nameOfRegionsToMerge, boolean forcible) {
1409    if (nameOfRegionsToMerge.size() < 2) {
1410      return failedFuture(new IllegalArgumentException(
1411        "Can not merge only " + nameOfRegionsToMerge.size() + " region"));
1412    }
1413    CompletableFuture<Void> future = new CompletableFuture<>();
1414    byte[][] encodedNameOfRegionsToMerge =
1415      nameOfRegionsToMerge.stream().map(this::toEncodeRegionName).toArray(byte[][]::new);
1416
1417    addListener(checkRegionsAndGetTableName(encodedNameOfRegionsToMerge), (tableName, err) -> {
1418      if (err != null) {
1419        future.completeExceptionally(err);
1420        return;
1421      }
1422
1423      final MergeTableRegionsRequest request;
1424      try {
1425        request = RequestConverter.buildMergeTableRegionsRequest(encodedNameOfRegionsToMerge,
1426          forcible, ng.getNonceGroup(), ng.newNonce());
1427      } catch (DeserializationException e) {
1428        future.completeExceptionally(e);
1429        return;
1430      }
1431
1432      addListener(
1433        this.procedureCall(tableName, request, MasterService.Interface::mergeTableRegions,
1434          MergeTableRegionsResponse::getProcId, new MergeTableRegionProcedureBiConsumer(tableName)),
1435        (ret, err2) -> {
1436          if (err2 != null) {
1437            future.completeExceptionally(err2);
1438          } else {
1439            future.complete(ret);
1440          }
1441        });
1442    });
1443    return future;
1444  }
1445
1446  @Override
1447  public CompletableFuture<Void> split(TableName tableName) {
1448    CompletableFuture<Void> future = new CompletableFuture<>();
1449    addListener(tableExists(tableName), (exist, error) -> {
1450      if (error != null) {
1451        future.completeExceptionally(error);
1452        return;
1453      }
1454      if (!exist) {
1455        future.completeExceptionally(new TableNotFoundException(tableName));
1456        return;
1457      }
1458      addListener(metaTable
1459        .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)
1460          .withStartRow(ClientMetaTableAccessor.getTableStartRowForMeta(tableName,
1461            ClientMetaTableAccessor.QueryType.REGION))
1462          .withStopRow(ClientMetaTableAccessor.getTableStopRowForMeta(tableName,
1463            ClientMetaTableAccessor.QueryType.REGION))),
1464        (results, err2) -> {
1465          if (err2 != null) {
1466            future.completeExceptionally(err2);
1467            return;
1468          }
1469          if (results != null && !results.isEmpty()) {
1470            List<CompletableFuture<Void>> splitFutures = new ArrayList<>();
1471            for (Result r : results) {
1472              if (r.isEmpty() || CatalogFamilyFormat.getRegionInfo(r) == null) {
1473                continue;
1474              }
1475              RegionLocations rl = CatalogFamilyFormat.getRegionLocations(r);
1476              if (rl != null) {
1477                for (HRegionLocation h : rl.getRegionLocations()) {
1478                  if (h != null && h.getServerName() != null) {
1479                    RegionInfo hri = h.getRegion();
1480                    if (
1481                      hri == null || hri.isSplitParent()
1482                        || hri.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID
1483                    ) {
1484                      continue;
1485                    }
1486                    splitFutures.add(split(hri, null));
1487                  }
1488                }
1489              }
1490            }
1491            addListener(
1492              CompletableFuture
1493                .allOf(splitFutures.toArray(new CompletableFuture<?>[splitFutures.size()])),
1494              (ret, exception) -> {
1495                if (exception != null) {
1496                  future.completeExceptionally(exception);
1497                  return;
1498                }
1499                future.complete(ret);
1500              });
1501          } else {
1502            future.complete(null);
1503          }
1504        });
1505    });
1506    return future;
1507  }
1508
1509  @Override
1510  public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) {
1511    CompletableFuture<Void> result = new CompletableFuture<>();
1512    if (splitPoint == null) {
1513      return failedFuture(new IllegalArgumentException("splitPoint can not be null."));
1514    }
1515    addListener(connection.getRegionLocator(tableName).getRegionLocation(splitPoint, true),
1516      (loc, err) -> {
1517        if (err != null) {
1518          result.completeExceptionally(err);
1519        } else if (loc == null || loc.getRegion() == null) {
1520          result.completeExceptionally(new IllegalArgumentException(
1521            "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint)));
1522        } else {
1523          addListener(splitRegion(loc.getRegion().getRegionName(), splitPoint), (ret, err2) -> {
1524            if (err2 != null) {
1525              result.completeExceptionally(err2);
1526            } else {
1527              result.complete(ret);
1528            }
1529
1530          });
1531        }
1532      });
1533    return result;
1534  }
1535
1536  @Override
1537  public CompletableFuture<Void> splitRegion(byte[] regionName) {
1538    CompletableFuture<Void> future = new CompletableFuture<>();
1539    addListener(getRegionLocation(regionName), (location, err) -> {
1540      if (err != null) {
1541        future.completeExceptionally(err);
1542        return;
1543      }
1544      RegionInfo regionInfo = location.getRegion();
1545      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1546        future.completeExceptionally(new IllegalArgumentException("Can't split replicas directly. "
1547          + "Replicas are auto-split when their primary is split."));
1548        return;
1549      }
1550      ServerName serverName = location.getServerName();
1551      if (serverName == null) {
1552        future
1553          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1554        return;
1555      }
1556      addListener(split(regionInfo, null), (ret, err2) -> {
1557        if (err2 != null) {
1558          future.completeExceptionally(err2);
1559        } else {
1560          future.complete(ret);
1561        }
1562      });
1563    });
1564    return future;
1565  }
1566
1567  @Override
1568  public CompletableFuture<Void> splitRegion(byte[] regionName, byte[] splitPoint) {
1569    Preconditions.checkNotNull(splitPoint,
1570      "splitPoint is null. If you don't specify a splitPoint, use splitRegion(byte[]) instead");
1571    CompletableFuture<Void> future = new CompletableFuture<>();
1572    addListener(getRegionLocation(regionName), (location, err) -> {
1573      if (err != null) {
1574        future.completeExceptionally(err);
1575        return;
1576      }
1577      RegionInfo regionInfo = location.getRegion();
1578      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1579        future.completeExceptionally(new IllegalArgumentException("Can't split replicas directly. "
1580          + "Replicas are auto-split when their primary is split."));
1581        return;
1582      }
1583      ServerName serverName = location.getServerName();
1584      if (serverName == null) {
1585        future
1586          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1587        return;
1588      }
1589      if (
1590        regionInfo.getStartKey() != null
1591          && Bytes.compareTo(regionInfo.getStartKey(), splitPoint) == 0
1592      ) {
1593        future.completeExceptionally(
1594          new IllegalArgumentException("should not give a splitkey which equals to startkey!"));
1595        return;
1596      }
1597      addListener(split(regionInfo, splitPoint), (ret, err2) -> {
1598        if (err2 != null) {
1599          future.completeExceptionally(err2);
1600        } else {
1601          future.complete(ret);
1602        }
1603      });
1604    });
1605    return future;
1606  }
1607
1608  private CompletableFuture<Void> split(final RegionInfo hri, byte[] splitPoint) {
1609    CompletableFuture<Void> future = new CompletableFuture<>();
1610    TableName tableName = hri.getTable();
1611    final SplitTableRegionRequest request;
1612    try {
1613      request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(),
1614        ng.newNonce());
1615    } catch (DeserializationException e) {
1616      future.completeExceptionally(e);
1617      return future;
1618    }
1619
1620    addListener(
1621      this.procedureCall(tableName, request, MasterService.Interface::splitRegion,
1622        SplitTableRegionResponse::getProcId, new SplitTableRegionProcedureBiConsumer(tableName)),
1623      (ret, err2) -> {
1624        if (err2 != null) {
1625          future.completeExceptionally(err2);
1626        } else {
1627          future.complete(ret);
1628        }
1629      });
1630    return future;
1631  }
1632
1633  @Override
1634  public CompletableFuture<Void> truncateRegion(byte[] regionName) {
1635    CompletableFuture<Void> future = new CompletableFuture<>();
1636    addListener(getRegionLocation(regionName), (location, err) -> {
1637      if (err != null) {
1638        future.completeExceptionally(err);
1639        return;
1640      }
1641      RegionInfo regionInfo = location.getRegion();
1642      if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
1643        future.completeExceptionally(new IllegalArgumentException(
1644          "Can't truncate replicas directly.Replicas are auto-truncated "
1645            + "when their primary is truncated."));
1646        return;
1647      }
1648      ServerName serverName = location.getServerName();
1649      if (serverName == null) {
1650        future
1651          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
1652        return;
1653      }
1654      addListener(truncateRegion(regionInfo), (ret, err2) -> {
1655        if (err2 != null) {
1656          future.completeExceptionally(err2);
1657        } else {
1658          future.complete(ret);
1659        }
1660      });
1661    });
1662    return future;
1663  }
1664
1665  private CompletableFuture<Void> truncateRegion(final RegionInfo hri) {
1666    CompletableFuture<Void> future = new CompletableFuture<>();
1667    TableName tableName = hri.getTable();
1668    final MasterProtos.TruncateRegionRequest request;
1669    try {
1670      request = RequestConverter.buildTruncateRegionRequest(hri, ng.getNonceGroup(), ng.newNonce());
1671    } catch (DeserializationException e) {
1672      future.completeExceptionally(e);
1673      return future;
1674    }
1675    addListener(this.procedureCall(tableName, request, MasterService.Interface::truncateRegion,
1676      MasterProtos.TruncateRegionResponse::getProcId,
1677      new TruncateRegionProcedureBiConsumer(tableName)), (ret, err2) -> {
1678        if (err2 != null) {
1679          future.completeExceptionally(err2);
1680        } else {
1681          future.complete(ret);
1682        }
1683      });
1684    return future;
1685  }
1686
1687  @Override
1688  public CompletableFuture<Void> assign(byte[] regionName) {
1689    CompletableFuture<Void> future = new CompletableFuture<>();
1690    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1691      if (err != null) {
1692        future.completeExceptionally(err);
1693        return;
1694      }
1695      addListener(
1696        this.<Void> newMasterCaller().priority(regionInfo.getTable())
1697          .action((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call(
1698            controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()),
1699            (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null))
1700          .call(),
1701        (ret, err2) -> {
1702          if (err2 != null) {
1703            future.completeExceptionally(err2);
1704          } else {
1705            future.complete(ret);
1706          }
1707        });
1708    });
1709    return future;
1710  }
1711
1712  @Override
1713  public CompletableFuture<Void> unassign(byte[] regionName) {
1714    CompletableFuture<Void> future = new CompletableFuture<>();
1715    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1716      if (err != null) {
1717        future.completeExceptionally(err);
1718        return;
1719      }
1720      addListener(
1721        this.<Void> newMasterCaller().priority(regionInfo.getTable())
1722          .action((controller, stub) -> this.<UnassignRegionRequest, UnassignRegionResponse,
1723            Void> call(controller, stub,
1724              RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName()),
1725              (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null))
1726          .call(),
1727        (ret, err2) -> {
1728          if (err2 != null) {
1729            future.completeExceptionally(err2);
1730          } else {
1731            future.complete(ret);
1732          }
1733        });
1734    });
1735    return future;
1736  }
1737
1738  @Override
1739  public CompletableFuture<Void> offline(byte[] regionName) {
1740    CompletableFuture<Void> future = new CompletableFuture<>();
1741    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1742      if (err != null) {
1743        future.completeExceptionally(err);
1744        return;
1745      }
1746      addListener(this.<Void> newMasterCaller().priority(regionInfo.getTable())
1747        .action((controller, stub) -> this.<OfflineRegionRequest, OfflineRegionResponse, Void> call(
1748          controller, stub, RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()),
1749          (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null))
1750        .call(), (ret, err2) -> {
1751          if (err2 != null) {
1752            future.completeExceptionally(err2);
1753          } else {
1754            future.complete(ret);
1755          }
1756        });
1757    });
1758    return future;
1759  }
1760
1761  @Override
1762  public CompletableFuture<Void> move(byte[] regionName) {
1763    CompletableFuture<Void> future = new CompletableFuture<>();
1764    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1765      if (err != null) {
1766        future.completeExceptionally(err);
1767        return;
1768      }
1769      addListener(
1770        moveRegion(regionInfo,
1771          RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)),
1772        (ret, err2) -> {
1773          if (err2 != null) {
1774            future.completeExceptionally(err2);
1775          } else {
1776            future.complete(ret);
1777          }
1778        });
1779    });
1780    return future;
1781  }
1782
1783  @Override
1784  public CompletableFuture<Void> move(byte[] regionName, ServerName destServerName) {
1785    Preconditions.checkNotNull(destServerName,
1786      "destServerName is null. If you don't specify a destServerName, use move(byte[]) instead");
1787    CompletableFuture<Void> future = new CompletableFuture<>();
1788    addListener(getRegionInfo(regionName), (regionInfo, err) -> {
1789      if (err != null) {
1790        future.completeExceptionally(err);
1791        return;
1792      }
1793      addListener(
1794        moveRegion(regionInfo, RequestConverter
1795          .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)),
1796        (ret, err2) -> {
1797          if (err2 != null) {
1798            future.completeExceptionally(err2);
1799          } else {
1800            future.complete(ret);
1801          }
1802        });
1803    });
1804    return future;
1805  }
1806
1807  private CompletableFuture<Void> moveRegion(RegionInfo regionInfo, MoveRegionRequest request) {
1808    return this.<Void> newMasterCaller().priority(regionInfo.getTable())
1809      .action(
1810        (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(controller,
1811          stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null))
1812      .call();
1813  }
1814
1815  @Override
1816  public CompletableFuture<Void> setQuota(QuotaSettings quota) {
1817    return this.<Void> newMasterCaller()
1818      .action((controller, stub) -> this.<SetQuotaRequest, SetQuotaResponse, Void> call(controller,
1819        stub, QuotaSettings.buildSetQuotaRequestProto(quota),
1820        (s, c, req, done) -> s.setQuota(c, req, done), (resp) -> null))
1821      .call();
1822  }
1823
1824  @Override
1825  public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) {
1826    CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>();
1827    Scan scan = QuotaTableUtil.makeScan(filter);
1828    this.connection.getTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build().scan(scan,
1829      new AdvancedScanResultConsumer() {
1830        List<QuotaSettings> settings = new ArrayList<>();
1831
1832        @Override
1833        public void onNext(Result[] results, ScanController controller) {
1834          for (Result result : results) {
1835            try {
1836              QuotaTableUtil.parseResultToCollection(result, settings);
1837            } catch (IOException e) {
1838              controller.terminate();
1839              future.completeExceptionally(e);
1840            }
1841          }
1842        }
1843
1844        @Override
1845        public void onError(Throwable error) {
1846          future.completeExceptionally(error);
1847        }
1848
1849        @Override
1850        public void onComplete() {
1851          future.complete(settings);
1852        }
1853      });
1854    return future;
1855  }
1856
1857  @Override
1858  public CompletableFuture<Void> addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig,
1859    boolean enabled) {
1860    return this.<AddReplicationPeerRequest, AddReplicationPeerResponse> procedureCall(
1861      RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled),
1862      (s, c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1863      new ReplicationProcedureBiConsumer(peerId, () -> "ADD_REPLICATION_PEER"));
1864  }
1865
1866  @Override
1867  public CompletableFuture<Void> removeReplicationPeer(String peerId) {
1868    return this.<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse> procedureCall(
1869      RequestConverter.buildRemoveReplicationPeerRequest(peerId),
1870      (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1871      new ReplicationProcedureBiConsumer(peerId, () -> "REMOVE_REPLICATION_PEER"));
1872  }
1873
1874  @Override
1875  public CompletableFuture<Void> enableReplicationPeer(String peerId) {
1876    return this.<EnableReplicationPeerRequest, EnableReplicationPeerResponse> procedureCall(
1877      RequestConverter.buildEnableReplicationPeerRequest(peerId),
1878      (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1879      new ReplicationProcedureBiConsumer(peerId, () -> "ENABLE_REPLICATION_PEER"));
1880  }
1881
1882  @Override
1883  public CompletableFuture<Void> disableReplicationPeer(String peerId) {
1884    return this.<DisableReplicationPeerRequest, DisableReplicationPeerResponse> procedureCall(
1885      RequestConverter.buildDisableReplicationPeerRequest(peerId),
1886      (s, c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
1887      new ReplicationProcedureBiConsumer(peerId, () -> "DISABLE_REPLICATION_PEER"));
1888  }
1889
1890  @Override
1891  public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) {
1892    return this.<ReplicationPeerConfig> newMasterCaller()
1893      .action((controller, stub) -> this.<GetReplicationPeerConfigRequest,
1894        GetReplicationPeerConfigResponse, ReplicationPeerConfig> call(controller, stub,
1895          RequestConverter.buildGetReplicationPeerConfigRequest(peerId),
1896          (s, c, req, done) -> s.getReplicationPeerConfig(c, req, done),
1897          (resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig())))
1898      .call();
1899  }
1900
1901  @Override
1902  public CompletableFuture<Void> updateReplicationPeerConfig(String peerId,
1903    ReplicationPeerConfig peerConfig) {
1904    return this.<UpdateReplicationPeerConfigRequest,
1905      UpdateReplicationPeerConfigResponse> procedureCall(
1906        RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig),
1907        (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done),
1908        (resp) -> resp.getProcId(),
1909        new ReplicationProcedureBiConsumer(peerId, () -> "UPDATE_REPLICATION_PEER_CONFIG"));
1910  }
1911
1912  @Override
1913  public CompletableFuture<Void> transitReplicationPeerSyncReplicationState(String peerId,
1914    SyncReplicationState clusterState) {
1915    return this.<TransitReplicationPeerSyncReplicationStateRequest,
1916      TransitReplicationPeerSyncReplicationStateResponse> procedureCall(
1917        RequestConverter.buildTransitReplicationPeerSyncReplicationStateRequest(peerId,
1918          clusterState),
1919        (s, c, req, done) -> s.transitReplicationPeerSyncReplicationState(c, req, done),
1920        (resp) -> resp.getProcId(), new ReplicationProcedureBiConsumer(peerId,
1921          () -> "TRANSIT_REPLICATION_PEER_SYNCHRONOUS_REPLICATION_STATE"));
1922  }
1923
1924  @Override
1925  public CompletableFuture<Void> appendReplicationPeerTableCFs(String id,
1926    Map<TableName, List<String>> tableCfs) {
1927    if (tableCfs == null) {
1928      return failedFuture(new ReplicationException("tableCfs is null"));
1929    }
1930
1931    CompletableFuture<Void> future = new CompletableFuture<Void>();
1932    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
1933      if (!completeExceptionally(future, error)) {
1934        ReplicationPeerConfig newPeerConfig =
1935          ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
1936        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
1937          if (!completeExceptionally(future, error)) {
1938            future.complete(result);
1939          }
1940        });
1941      }
1942    });
1943    return future;
1944  }
1945
1946  @Override
1947  public CompletableFuture<Void> removeReplicationPeerTableCFs(String id,
1948    Map<TableName, List<String>> tableCfs) {
1949    if (tableCfs == null) {
1950      return failedFuture(new ReplicationException("tableCfs is null"));
1951    }
1952
1953    CompletableFuture<Void> future = new CompletableFuture<Void>();
1954    addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
1955      if (!completeExceptionally(future, error)) {
1956        ReplicationPeerConfig newPeerConfig = null;
1957        try {
1958          newPeerConfig = ReplicationPeerConfigUtil
1959            .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
1960        } catch (ReplicationException e) {
1961          future.completeExceptionally(e);
1962          return;
1963        }
1964        addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
1965          if (!completeExceptionally(future, error)) {
1966            future.complete(result);
1967          }
1968        });
1969      }
1970    });
1971    return future;
1972  }
1973
1974  @Override
1975  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers() {
1976    return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(null));
1977  }
1978
1979  @Override
1980  public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Pattern pattern) {
1981    Preconditions.checkNotNull(pattern,
1982      "pattern is null. If you don't specify a pattern, use listReplicationPeers() instead");
1983    return listReplicationPeers(RequestConverter.buildListReplicationPeersRequest(pattern));
1984  }
1985
1986  private CompletableFuture<List<ReplicationPeerDescription>>
1987    listReplicationPeers(ListReplicationPeersRequest request) {
1988    return this.<List<ReplicationPeerDescription>> newMasterCaller()
1989      .action((controller, stub) -> this.<ListReplicationPeersRequest, ListReplicationPeersResponse,
1990        List<ReplicationPeerDescription>> call(controller, stub, request,
1991          (s, c, req, done) -> s.listReplicationPeers(c, req, done),
1992          (resp) -> resp.getPeerDescList().stream()
1993            .map(ReplicationPeerConfigUtil::toReplicationPeerDescription)
1994            .collect(Collectors.toList())))
1995      .call();
1996  }
1997
1998  @Override
1999  public CompletableFuture<List<TableCFs>> listReplicatedTableCFs() {
2000    CompletableFuture<List<TableCFs>> future = new CompletableFuture<List<TableCFs>>();
2001    addListener(listTableDescriptors(), (tables, error) -> {
2002      if (!completeExceptionally(future, error)) {
2003        List<TableCFs> replicatedTableCFs = new ArrayList<>();
2004        tables.forEach(table -> {
2005          Map<String, Integer> cfs = new HashMap<>();
2006          Stream.of(table.getColumnFamilies())
2007            .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL)
2008            .forEach(column -> {
2009              cfs.put(column.getNameAsString(), column.getScope());
2010            });
2011          if (!cfs.isEmpty()) {
2012            replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs));
2013          }
2014        });
2015        future.complete(replicatedTableCFs);
2016      }
2017    });
2018    return future;
2019  }
2020
2021  @Override
2022  public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) {
2023    SnapshotProtos.SnapshotDescription snapshot =
2024      ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc);
2025    try {
2026      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
2027    } catch (IllegalArgumentException e) {
2028      return failedFuture(e);
2029    }
2030    CompletableFuture<Void> future = new CompletableFuture<>();
2031    final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot)
2032      .setNonceGroup(ng.getNonceGroup()).setNonce(ng.newNonce()).build();
2033    addListener(this.<SnapshotResponse> newMasterCaller()
2034      .action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, SnapshotResponse> call(
2035        controller, stub, request, (s, c, req, done) -> s.snapshot(c, req, done), resp -> resp))
2036      .call(), (resp, err) -> {
2037        if (err != null) {
2038          future.completeExceptionally(err);
2039          return;
2040        }
2041        waitSnapshotFinish(snapshotDesc, future, resp);
2042      });
2043    return future;
2044  }
2045
2046  // This is for keeping compatibility with old implementation.
2047  // If there is a procId field in the response, then the snapshot will be operated with a
2048  // SnapshotProcedure, otherwise the snapshot will be coordinated by zk.
2049  private void waitSnapshotFinish(SnapshotDescription snapshot, CompletableFuture<Void> future,
2050    SnapshotResponse resp) {
2051    if (resp.hasProcId()) {
2052      getProcedureResult(resp.getProcId(), future, 0);
2053      addListener(future, new SnapshotProcedureBiConsumer(snapshot.getTableName()));
2054    } else {
2055      long expectedTimeout = resp.getExpectedTimeout();
2056      TimerTask pollingTask = new TimerTask() {
2057        int tries = 0;
2058        long startTime = EnvironmentEdgeManager.currentTime();
2059        long endTime = startTime + expectedTimeout;
2060        long maxPauseTime = expectedTimeout / maxAttempts;
2061
2062        @Override
2063        public void run(Timeout timeout) throws Exception {
2064          if (EnvironmentEdgeManager.currentTime() < endTime) {
2065            addListener(isSnapshotFinished(snapshot), (done, err2) -> {
2066              if (err2 != null) {
2067                future.completeExceptionally(err2);
2068              } else if (done) {
2069                future.complete(null);
2070              } else {
2071                // retry again after pauseTime.
2072                long pauseTime =
2073                  ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
2074                pauseTime = Math.min(pauseTime, maxPauseTime);
2075                AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, TimeUnit.MILLISECONDS);
2076              }
2077            });
2078          } else {
2079            future
2080              .completeExceptionally(new SnapshotCreationException("Snapshot '" + snapshot.getName()
2081                + "' wasn't completed in expectedTime:" + expectedTimeout + " ms", snapshot));
2082          }
2083        }
2084      };
2085      AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
2086    }
2087  }
2088
2089  @Override
2090  public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) {
2091    return this.<Boolean> newMasterCaller()
2092      .action((controller, stub) -> this.<IsSnapshotDoneRequest, IsSnapshotDoneResponse,
2093        Boolean> call(controller, stub,
2094          IsSnapshotDoneRequest.newBuilder()
2095            .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(),
2096          (s, c, req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone()))
2097      .call();
2098  }
2099
2100  @Override
2101  public CompletableFuture<Void> restoreSnapshot(String snapshotName) {
2102    boolean takeFailSafeSnapshot = this.connection.getConfiguration().getBoolean(
2103      HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT,
2104      HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT);
2105    return restoreSnapshot(snapshotName, takeFailSafeSnapshot);
2106  }
2107
2108  @Override
2109  public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot,
2110    boolean restoreAcl) {
2111    CompletableFuture<Void> future = new CompletableFuture<>();
2112    addListener(listSnapshots(Pattern.compile(snapshotName)), (snapshotDescriptions, err) -> {
2113      if (err != null) {
2114        future.completeExceptionally(err);
2115        return;
2116      }
2117      TableName tableName = null;
2118      if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) {
2119        for (SnapshotDescription snap : snapshotDescriptions) {
2120          if (snap.getName().equals(snapshotName)) {
2121            tableName = snap.getTableName();
2122            break;
2123          }
2124        }
2125      }
2126      if (tableName == null) {
2127        future.completeExceptionally(new RestoreSnapshotException(
2128          "Unable to find the table name for snapshot=" + snapshotName));
2129        return;
2130      }
2131      final TableName finalTableName = tableName;
2132      addListener(tableExists(finalTableName), (exists, err2) -> {
2133        if (err2 != null) {
2134          future.completeExceptionally(err2);
2135        } else if (!exists) {
2136          // if table does not exist, then just clone snapshot into new table.
2137          completeConditionalOnFuture(future,
2138            internalRestoreSnapshot(snapshotName, finalTableName, restoreAcl, null));
2139        } else {
2140          addListener(isTableDisabled(finalTableName), (disabled, err4) -> {
2141            if (err4 != null) {
2142              future.completeExceptionally(err4);
2143            } else if (!disabled) {
2144              future.completeExceptionally(new TableNotDisabledException(finalTableName));
2145            } else {
2146              completeConditionalOnFuture(future,
2147                restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot, restoreAcl));
2148            }
2149          });
2150        }
2151      });
2152    });
2153    return future;
2154  }
2155
2156  private CompletableFuture<Void> restoreSnapshot(String snapshotName, TableName tableName,
2157    boolean takeFailSafeSnapshot, boolean restoreAcl) {
2158    if (takeFailSafeSnapshot) {
2159      CompletableFuture<Void> future = new CompletableFuture<>();
2160      // Step.1 Take a snapshot of the current state
2161      String failSafeSnapshotSnapshotNameFormat =
2162        this.connection.getConfiguration().get(HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME,
2163          HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME);
2164      final String failSafeSnapshotSnapshotName =
2165        failSafeSnapshotSnapshotNameFormat.replace("{snapshot.name}", snapshotName)
2166          .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.'))
2167          .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime()));
2168      LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2169      addListener(snapshot(failSafeSnapshotSnapshotName, tableName), (ret, err) -> {
2170        if (err != null) {
2171          future.completeExceptionally(err);
2172        } else {
2173          // Step.2 Restore snapshot
2174          addListener(internalRestoreSnapshot(snapshotName, tableName, restoreAcl, null),
2175            (void2, err2) -> {
2176              if (err2 != null) {
2177                // Step.3.a Something went wrong during the restore and try to rollback.
2178                addListener(internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName,
2179                  restoreAcl, null), (void3, err3) -> {
2180                    if (err3 != null) {
2181                      future.completeExceptionally(err3);
2182                    } else {
2183                      String msg =
2184                        "Restore snapshot=" + snapshotName + " failed. Rollback to snapshot="
2185                          + failSafeSnapshotSnapshotName + " succeeded.";
2186                      future.completeExceptionally(new RestoreSnapshotException(msg, err2));
2187                    }
2188                  });
2189              } else {
2190                // Step.3.b If the restore is succeeded, delete the pre-restore snapshot.
2191                LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
2192                addListener(deleteSnapshot(failSafeSnapshotSnapshotName), (ret3, err3) -> {
2193                  if (err3 != null) {
2194                    LOG.error(
2195                      "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName,
2196                      err3);
2197                  }
2198                  future.complete(ret3);
2199                });
2200              }
2201            });
2202        }
2203      });
2204      return future;
2205    } else {
2206      return internalRestoreSnapshot(snapshotName, tableName, restoreAcl, null);
2207    }
2208  }
2209
2210  private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture,
2211    CompletableFuture<T> parentFuture) {
2212    addListener(parentFuture, (res, err) -> {
2213      if (err != null) {
2214        dependentFuture.completeExceptionally(err);
2215      } else {
2216        dependentFuture.complete(res);
2217      }
2218    });
2219  }
2220
2221  @Override
2222  public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName,
2223    boolean restoreAcl, String customSFT) {
2224    CompletableFuture<Void> future = new CompletableFuture<>();
2225    addListener(tableExists(tableName), (exists, err) -> {
2226      if (err != null) {
2227        future.completeExceptionally(err);
2228      } else if (exists) {
2229        future.completeExceptionally(new TableExistsException(tableName));
2230      } else {
2231        completeConditionalOnFuture(future,
2232          internalRestoreSnapshot(snapshotName, tableName, restoreAcl, customSFT));
2233      }
2234    });
2235    return future;
2236  }
2237
2238  private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, TableName tableName,
2239    boolean restoreAcl, String customSFT) {
2240    SnapshotProtos.SnapshotDescription snapshot = SnapshotProtos.SnapshotDescription.newBuilder()
2241      .setName(snapshotName).setTable(tableName.getNameAsString()).build();
2242    try {
2243      ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
2244    } catch (IllegalArgumentException e) {
2245      return failedFuture(e);
2246    }
2247    RestoreSnapshotRequest.Builder builder =
2248      RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot).setNonceGroup(ng.getNonceGroup())
2249        .setNonce(ng.newNonce()).setRestoreACL(restoreAcl);
2250    if (customSFT != null) {
2251      builder.setCustomSFT(customSFT);
2252    }
2253    return waitProcedureResult(this.<Long> newMasterCaller()
2254      .action((controller, stub) -> this.<RestoreSnapshotRequest, RestoreSnapshotResponse,
2255        Long> call(controller, stub, builder.build(),
2256          (s, c, req, done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId()))
2257      .call());
2258  }
2259
2260  @Override
2261  public CompletableFuture<List<SnapshotDescription>> listSnapshots() {
2262    return getCompletedSnapshots(null);
2263  }
2264
2265  @Override
2266  public CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern) {
2267    Preconditions.checkNotNull(pattern,
2268      "pattern is null. If you don't specify a pattern, use listSnapshots() instead");
2269    return getCompletedSnapshots(pattern);
2270  }
2271
2272  private CompletableFuture<List<SnapshotDescription>> getCompletedSnapshots(Pattern pattern) {
2273    return this.<List<SnapshotDescription>> newMasterCaller()
2274      .action((controller, stub) -> this.<GetCompletedSnapshotsRequest,
2275        GetCompletedSnapshotsResponse, List<SnapshotDescription>> call(controller, stub,
2276          GetCompletedSnapshotsRequest.newBuilder().build(),
2277          (s, c, req, done) -> s.getCompletedSnapshots(c, req, done),
2278          resp -> ProtobufUtil.toSnapshotDescriptionList(resp, pattern)))
2279      .call();
2280  }
2281
2282  @Override
2283  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern) {
2284    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2285      + " If you don't specify a tableNamePattern, use listSnapshots() instead");
2286    return getCompletedSnapshots(tableNamePattern, null);
2287  }
2288
2289  @Override
2290  public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern,
2291    Pattern snapshotNamePattern) {
2292    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2293      + " If you don't specify a tableNamePattern, use listSnapshots(Pattern) instead");
2294    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2295      + " If you don't specify a snapshotNamePattern, use listTableSnapshots(Pattern) instead");
2296    return getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2297  }
2298
2299  private CompletableFuture<List<SnapshotDescription>>
2300    getCompletedSnapshots(Pattern tableNamePattern, Pattern snapshotNamePattern) {
2301    CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>();
2302    addListener(listTableNames(tableNamePattern, false), (tableNames, err) -> {
2303      if (err != null) {
2304        future.completeExceptionally(err);
2305        return;
2306      }
2307      if (tableNames == null || tableNames.size() <= 0) {
2308        future.complete(Collections.emptyList());
2309        return;
2310      }
2311      addListener(getCompletedSnapshots(snapshotNamePattern), (snapshotDescList, err2) -> {
2312        if (err2 != null) {
2313          future.completeExceptionally(err2);
2314          return;
2315        }
2316        if (snapshotDescList == null || snapshotDescList.isEmpty()) {
2317          future.complete(Collections.emptyList());
2318          return;
2319        }
2320        future.complete(snapshotDescList.stream()
2321          .filter(snap -> (snap != null && tableNames.contains(snap.getTableName())))
2322          .collect(Collectors.toList()));
2323      });
2324    });
2325    return future;
2326  }
2327
2328  @Override
2329  public CompletableFuture<Void> deleteSnapshot(String snapshotName) {
2330    return internalDeleteSnapshot(new SnapshotDescription(snapshotName));
2331  }
2332
2333  @Override
2334  public CompletableFuture<Void> deleteSnapshots() {
2335    return internalDeleteSnapshots(null, null);
2336  }
2337
2338  @Override
2339  public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) {
2340    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2341      + " If you don't specify a snapshotNamePattern, use deleteSnapshots() instead");
2342    return internalDeleteSnapshots(null, snapshotNamePattern);
2343  }
2344
2345  @Override
2346  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern) {
2347    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2348      + " If you don't specify a tableNamePattern, use deleteSnapshots() instead");
2349    return internalDeleteSnapshots(tableNamePattern, null);
2350  }
2351
2352  @Override
2353  public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern,
2354    Pattern snapshotNamePattern) {
2355    Preconditions.checkNotNull(tableNamePattern, "tableNamePattern is null."
2356      + " If you don't specify a tableNamePattern, use deleteSnapshots(Pattern) instead");
2357    Preconditions.checkNotNull(snapshotNamePattern, "snapshotNamePattern is null."
2358      + " If you don't specify a snapshotNamePattern, use deleteSnapshots(Pattern) instead");
2359    return internalDeleteSnapshots(tableNamePattern, snapshotNamePattern);
2360  }
2361
2362  private CompletableFuture<Void> internalDeleteSnapshots(Pattern tableNamePattern,
2363    Pattern snapshotNamePattern) {
2364    CompletableFuture<List<SnapshotDescription>> listSnapshotsFuture;
2365    if (tableNamePattern == null) {
2366      listSnapshotsFuture = getCompletedSnapshots(snapshotNamePattern);
2367    } else {
2368      listSnapshotsFuture = getCompletedSnapshots(tableNamePattern, snapshotNamePattern);
2369    }
2370    CompletableFuture<Void> future = new CompletableFuture<>();
2371    addListener(listSnapshotsFuture, (snapshotDescriptions, err) -> {
2372      if (err != null) {
2373        future.completeExceptionally(err);
2374        return;
2375      }
2376      if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) {
2377        future.complete(null);
2378        return;
2379      }
2380      addListener(CompletableFuture.allOf(snapshotDescriptions.stream()
2381        .map(this::internalDeleteSnapshot).toArray(CompletableFuture[]::new)), (v, e) -> {
2382          if (e != null) {
2383            future.completeExceptionally(e);
2384          } else {
2385            future.complete(v);
2386          }
2387        });
2388    });
2389    return future;
2390  }
2391
2392  private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) {
2393    return this.<Void> newMasterCaller()
2394      .action((controller, stub) -> this.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call(
2395        controller, stub,
2396        DeleteSnapshotRequest.newBuilder()
2397          .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(),
2398        (s, c, req, done) -> s.deleteSnapshot(c, req, done), resp -> null))
2399      .call();
2400  }
2401
2402  @Override
2403  public CompletableFuture<Void> execProcedure(String signature, String instance,
2404    Map<String, String> props) {
2405    CompletableFuture<Void> future = new CompletableFuture<>();
2406    ProcedureDescription procDesc =
2407      ProtobufUtil.buildProcedureDescription(signature, instance, props);
2408    addListener(this.<Long> newMasterCaller()
2409      .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, Long> call(
2410        controller, stub, ExecProcedureRequest.newBuilder().setProcedure(procDesc).build(),
2411        (s, c, req, done) -> s.execProcedure(c, req, done), resp -> resp.getExpectedTimeout()))
2412      .call(), (expectedTimeout, err) -> {
2413        if (err != null) {
2414          future.completeExceptionally(err);
2415          return;
2416        }
2417        TimerTask pollingTask = new TimerTask() {
2418          int tries = 0;
2419          long startTime = EnvironmentEdgeManager.currentTime();
2420          long endTime = startTime + expectedTimeout;
2421          long maxPauseTime = expectedTimeout / maxAttempts;
2422
2423          @Override
2424          public void run(Timeout timeout) throws Exception {
2425            if (EnvironmentEdgeManager.currentTime() < endTime) {
2426              addListener(isProcedureFinished(signature, instance, props), (done, err2) -> {
2427                if (err2 != null) {
2428                  future.completeExceptionally(err2);
2429                  return;
2430                }
2431                if (done) {
2432                  future.complete(null);
2433                } else {
2434                  // retry again after pauseTime.
2435                  long pauseTime =
2436                    ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
2437                  pauseTime = Math.min(pauseTime, maxPauseTime);
2438                  AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
2439                    TimeUnit.MICROSECONDS);
2440                }
2441              });
2442            } else {
2443              future.completeExceptionally(new IOException("Procedure '" + signature + " : "
2444                + instance + "' wasn't completed in expectedTime:" + expectedTimeout + " ms"));
2445            }
2446          }
2447        };
2448        // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously.
2449        AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
2450      });
2451    return future;
2452  }
2453
2454  @Override
2455  public CompletableFuture<byte[]> execProcedureWithReturn(String signature, String instance,
2456    Map<String, String> props) {
2457    ProcedureDescription proDesc =
2458      ProtobufUtil.buildProcedureDescription(signature, instance, props);
2459    return this.<byte[]> newMasterCaller()
2460      .action((controller, stub) -> this.<ExecProcedureRequest, ExecProcedureResponse, byte[]> call(
2461        controller, stub, ExecProcedureRequest.newBuilder().setProcedure(proDesc).build(),
2462        (s, c, req, done) -> s.execProcedureWithRet(c, req, done),
2463        resp -> resp.hasReturnData() ? resp.getReturnData().toByteArray() : null))
2464      .call();
2465  }
2466
2467  @Override
2468  public CompletableFuture<Boolean> isProcedureFinished(String signature, String instance,
2469    Map<String, String> props) {
2470    ProcedureDescription proDesc =
2471      ProtobufUtil.buildProcedureDescription(signature, instance, props);
2472    return this.<Boolean> newMasterCaller()
2473      .action(
2474        (controller, stub) -> this.<IsProcedureDoneRequest, IsProcedureDoneResponse, Boolean> call(
2475          controller, stub, IsProcedureDoneRequest.newBuilder().setProcedure(proDesc).build(),
2476          (s, c, req, done) -> s.isProcedureDone(c, req, done), resp -> resp.getDone()))
2477      .call();
2478  }
2479
2480  @Override
2481  public CompletableFuture<Boolean> abortProcedure(long procId, boolean mayInterruptIfRunning) {
2482    return this.<Boolean> newMasterCaller().action(
2483      (controller, stub) -> this.<AbortProcedureRequest, AbortProcedureResponse, Boolean> call(
2484        controller, stub, AbortProcedureRequest.newBuilder().setProcId(procId).build(),
2485        (s, c, req, done) -> s.abortProcedure(c, req, done), resp -> resp.getIsProcedureAborted()))
2486      .call();
2487  }
2488
2489  @Override
2490  public CompletableFuture<String> getProcedures() {
2491    return this.<String> newMasterCaller()
2492      .action((controller, stub) -> this.<GetProceduresRequest, GetProceduresResponse, String> call(
2493        controller, stub, GetProceduresRequest.newBuilder().build(),
2494        (s, c, req, done) -> s.getProcedures(c, req, done),
2495        resp -> ProtobufUtil.toProcedureJson(resp.getProcedureList())))
2496      .call();
2497  }
2498
2499  @Override
2500  public CompletableFuture<String> getLocks() {
2501    return this.<String> newMasterCaller()
2502      .action(
2503        (controller, stub) -> this.<GetLocksRequest, GetLocksResponse, String> call(controller,
2504          stub, GetLocksRequest.newBuilder().build(), (s, c, req, done) -> s.getLocks(c, req, done),
2505          resp -> ProtobufUtil.toLockJson(resp.getLockList())))
2506      .call();
2507  }
2508
2509  @Override
2510  public CompletableFuture<Void> decommissionRegionServers(List<ServerName> servers,
2511    boolean offload) {
2512    return this.<Void> newMasterCaller()
2513      .action((controller, stub) -> this.<DecommissionRegionServersRequest,
2514        DecommissionRegionServersResponse, Void> call(controller, stub,
2515          RequestConverter.buildDecommissionRegionServersRequest(servers, offload),
2516          (s, c, req, done) -> s.decommissionRegionServers(c, req, done), resp -> null))
2517      .call();
2518  }
2519
2520  @Override
2521  public CompletableFuture<List<ServerName>> listDecommissionedRegionServers() {
2522    return this.<List<ServerName>> newMasterCaller()
2523      .action((controller, stub) -> this.<ListDecommissionedRegionServersRequest,
2524        ListDecommissionedRegionServersResponse, List<ServerName>> call(controller, stub,
2525          ListDecommissionedRegionServersRequest.newBuilder().build(),
2526          (s, c, req, done) -> s.listDecommissionedRegionServers(c, req, done),
2527          resp -> resp.getServerNameList().stream().map(ProtobufUtil::toServerName)
2528            .collect(Collectors.toList())))
2529      .call();
2530  }
2531
2532  @Override
2533  public CompletableFuture<Void> recommissionRegionServer(ServerName server,
2534    List<byte[]> encodedRegionNames) {
2535    return this.<Void> newMasterCaller()
2536      .action((controller, stub) -> this.<RecommissionRegionServerRequest,
2537        RecommissionRegionServerResponse, Void> call(controller, stub,
2538          RequestConverter.buildRecommissionRegionServerRequest(server, encodedRegionNames),
2539          (s, c, req, done) -> s.recommissionRegionServer(c, req, done), resp -> null))
2540      .call();
2541  }
2542
2543  /**
2544   * Get the region location for the passed region name. The region name may be a full region name
2545   * or encoded region name. If the region does not found, then it'll throw an
2546   * UnknownRegionException wrapped by a {@link CompletableFuture}
2547   * @param regionNameOrEncodedRegionName region name or encoded region name
2548   * @return region location, wrapped by a {@link CompletableFuture}
2549   */
2550  CompletableFuture<HRegionLocation> getRegionLocation(byte[] regionNameOrEncodedRegionName) {
2551    if (regionNameOrEncodedRegionName == null) {
2552      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2553    }
2554
2555    CompletableFuture<Optional<HRegionLocation>> future;
2556    if (RegionInfo.isEncodedRegionName(regionNameOrEncodedRegionName)) {
2557      String encodedName = Bytes.toString(regionNameOrEncodedRegionName);
2558      if (encodedName.length() < RegionInfo.MD5_HEX_LENGTH) {
2559        // old format encodedName, should be meta region
2560        future = connection.registry.getMetaRegionLocations()
2561          .thenApply(locs -> Stream.of(locs.getRegionLocations())
2562            .filter(loc -> loc.getRegion().getEncodedName().equals(encodedName)).findFirst());
2563      } else {
2564        future = ClientMetaTableAccessor.getRegionLocationWithEncodedName(metaTable,
2565          regionNameOrEncodedRegionName);
2566      }
2567    } else {
2568      // Not all regionNameOrEncodedRegionName here is going to be a valid region name,
2569      // it needs to throw out IllegalArgumentException in case tableName is passed in.
2570      RegionInfo regionInfo;
2571      try {
2572        regionInfo =
2573          CatalogFamilyFormat.parseRegionInfoFromRegionName(regionNameOrEncodedRegionName);
2574      } catch (IOException ioe) {
2575        return failedFuture(new IllegalArgumentException(ioe.getMessage()));
2576      }
2577
2578      if (regionInfo.isMetaRegion()) {
2579        future = connection.registry.getMetaRegionLocations()
2580          .thenApply(locs -> Stream.of(locs.getRegionLocations())
2581            .filter(loc -> loc.getRegion().getReplicaId() == regionInfo.getReplicaId())
2582            .findFirst());
2583      } else {
2584        future =
2585          ClientMetaTableAccessor.getRegionLocation(metaTable, regionNameOrEncodedRegionName);
2586      }
2587    }
2588
2589    CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>();
2590    addListener(future, (location, err) -> {
2591      if (err != null) {
2592        returnedFuture.completeExceptionally(err);
2593        return;
2594      }
2595      if (!location.isPresent() || location.get().getRegion() == null) {
2596        returnedFuture.completeExceptionally(
2597          new UnknownRegionException("Invalid region name or encoded region name: "
2598            + Bytes.toStringBinary(regionNameOrEncodedRegionName)));
2599      } else {
2600        returnedFuture.complete(location.get());
2601      }
2602    });
2603    return returnedFuture;
2604  }
2605
2606  /**
2607   * Get the region info for the passed region name. The region name may be a full region name or
2608   * encoded region name. If the region does not found, then it'll throw an UnknownRegionException
2609   * wrapped by a {@link CompletableFuture}
2610   * @return region info, wrapped by a {@link CompletableFuture}
2611   */
2612  private CompletableFuture<RegionInfo> getRegionInfo(byte[] regionNameOrEncodedRegionName) {
2613    if (regionNameOrEncodedRegionName == null) {
2614      return failedFuture(new IllegalArgumentException("Passed region name can't be null"));
2615    }
2616
2617    if (
2618      Bytes.equals(regionNameOrEncodedRegionName,
2619        RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName())
2620        || Bytes.equals(regionNameOrEncodedRegionName,
2621          RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes())
2622    ) {
2623      return CompletableFuture.completedFuture(RegionInfoBuilder.FIRST_META_REGIONINFO);
2624    }
2625
2626    CompletableFuture<RegionInfo> future = new CompletableFuture<>();
2627    addListener(getRegionLocation(regionNameOrEncodedRegionName), (location, err) -> {
2628      if (err != null) {
2629        future.completeExceptionally(err);
2630      } else {
2631        future.complete(location.getRegion());
2632      }
2633    });
2634    return future;
2635  }
2636
2637  private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) {
2638    if (numRegions < 3) {
2639      throw new IllegalArgumentException("Must create at least three regions");
2640    } else if (Bytes.compareTo(startKey, endKey) >= 0) {
2641      throw new IllegalArgumentException("Start key must be smaller than end key");
2642    }
2643    if (numRegions == 3) {
2644      return new byte[][] { startKey, endKey };
2645    }
2646    byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
2647    if (splitKeys == null || splitKeys.length != numRegions - 1) {
2648      throw new IllegalArgumentException("Unable to split key range into enough regions");
2649    }
2650    return splitKeys;
2651  }
2652
2653  private void verifySplitKeys(byte[][] splitKeys) {
2654    Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR);
2655    // Verify there are no duplicate split keys
2656    byte[] lastKey = null;
2657    for (byte[] splitKey : splitKeys) {
2658      if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) {
2659        throw new IllegalArgumentException("Empty split key must not be passed in the split keys.");
2660      }
2661      if (lastKey != null && Bytes.equals(splitKey, lastKey)) {
2662        throw new IllegalArgumentException("All split keys must be unique, " + "found duplicate: "
2663          + Bytes.toStringBinary(splitKey) + ", " + Bytes.toStringBinary(lastKey));
2664      }
2665      lastKey = splitKey;
2666    }
2667  }
2668
2669  private static abstract class ProcedureBiConsumer implements BiConsumer<Void, Throwable> {
2670
2671    abstract void onFinished();
2672
2673    abstract void onError(Throwable error);
2674
2675    @Override
2676    public void accept(Void v, Throwable error) {
2677      if (error != null) {
2678        onError(error);
2679        return;
2680      }
2681      onFinished();
2682    }
2683  }
2684
2685  private static abstract class TableProcedureBiConsumer extends ProcedureBiConsumer {
2686    protected final TableName tableName;
2687
2688    TableProcedureBiConsumer(TableName tableName) {
2689      this.tableName = tableName;
2690    }
2691
2692    abstract String getOperationType();
2693
2694    String getDescription() {
2695      return "Operation: " + getOperationType() + ", " + "Table Name: "
2696        + tableName.getNameWithNamespaceInclAsString();
2697    }
2698
2699    @Override
2700    void onFinished() {
2701      LOG.info(getDescription() + " completed");
2702    }
2703
2704    @Override
2705    void onError(Throwable error) {
2706      LOG.info(getDescription() + " failed with " + error.getMessage());
2707    }
2708  }
2709
2710  private static abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer {
2711    protected final String namespaceName;
2712
2713    NamespaceProcedureBiConsumer(String namespaceName) {
2714      this.namespaceName = namespaceName;
2715    }
2716
2717    abstract String getOperationType();
2718
2719    String getDescription() {
2720      return "Operation: " + getOperationType() + ", Namespace: " + namespaceName;
2721    }
2722
2723    @Override
2724    void onFinished() {
2725      LOG.info(getDescription() + " completed");
2726    }
2727
2728    @Override
2729    void onError(Throwable error) {
2730      LOG.info(getDescription() + " failed with " + error.getMessage());
2731    }
2732  }
2733
2734  private static class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2735
2736    CreateTableProcedureBiConsumer(TableName tableName) {
2737      super(tableName);
2738    }
2739
2740    @Override
2741    String getOperationType() {
2742      return "CREATE";
2743    }
2744  }
2745
2746  private static class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer {
2747
2748    ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
2749      super(tableName);
2750    }
2751
2752    @Override
2753    String getOperationType() {
2754      return "MODIFY";
2755    }
2756  }
2757
2758  private static class ModifyTableStoreFileTrackerProcedureBiConsumer
2759    extends TableProcedureBiConsumer {
2760
2761    ModifyTableStoreFileTrackerProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
2762      super(tableName);
2763    }
2764
2765    @Override
2766    String getOperationType() {
2767      return "MODIFY_TABLE_STORE_FILE_TRACKER";
2768    }
2769  }
2770
2771  private class DeleteTableProcedureBiConsumer extends TableProcedureBiConsumer {
2772
2773    DeleteTableProcedureBiConsumer(TableName tableName) {
2774      super(tableName);
2775    }
2776
2777    @Override
2778    String getOperationType() {
2779      return "DELETE";
2780    }
2781
2782    @Override
2783    void onFinished() {
2784      connection.getLocator().clearCache(this.tableName);
2785      super.onFinished();
2786    }
2787  }
2788
2789  private static class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer {
2790
2791    TruncateTableProcedureBiConsumer(TableName tableName) {
2792      super(tableName);
2793    }
2794
2795    @Override
2796    String getOperationType() {
2797      return "TRUNCATE";
2798    }
2799  }
2800
2801  private static class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2802
2803    EnableTableProcedureBiConsumer(TableName tableName) {
2804      super(tableName);
2805    }
2806
2807    @Override
2808    String getOperationType() {
2809      return "ENABLE";
2810    }
2811  }
2812
2813  private static class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer {
2814
2815    DisableTableProcedureBiConsumer(TableName tableName) {
2816      super(tableName);
2817    }
2818
2819    @Override
2820    String getOperationType() {
2821      return "DISABLE";
2822    }
2823  }
2824
2825  private static class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2826
2827    AddColumnFamilyProcedureBiConsumer(TableName tableName) {
2828      super(tableName);
2829    }
2830
2831    @Override
2832    String getOperationType() {
2833      return "ADD_COLUMN_FAMILY";
2834    }
2835  }
2836
2837  private static class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2838
2839    DeleteColumnFamilyProcedureBiConsumer(TableName tableName) {
2840      super(tableName);
2841    }
2842
2843    @Override
2844    String getOperationType() {
2845      return "DELETE_COLUMN_FAMILY";
2846    }
2847  }
2848
2849  private static class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
2850
2851    ModifyColumnFamilyProcedureBiConsumer(TableName tableName) {
2852      super(tableName);
2853    }
2854
2855    @Override
2856    String getOperationType() {
2857      return "MODIFY_COLUMN_FAMILY";
2858    }
2859  }
2860
2861  private static class ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer
2862    extends TableProcedureBiConsumer {
2863
2864    ModifyColumnFamilyStoreFileTrackerProcedureBiConsumer(TableName tableName) {
2865      super(tableName);
2866    }
2867
2868    @Override
2869    String getOperationType() {
2870      return "MODIFY_COLUMN_FAMILY_STORE_FILE_TRACKER";
2871    }
2872  }
2873
2874  private static class FlushTableProcedureBiConsumer extends TableProcedureBiConsumer {
2875
2876    FlushTableProcedureBiConsumer(TableName tableName) {
2877      super(tableName);
2878    }
2879
2880    @Override
2881    String getOperationType() {
2882      return "FLUSH";
2883    }
2884  }
2885
2886  private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2887
2888    CreateNamespaceProcedureBiConsumer(String namespaceName) {
2889      super(namespaceName);
2890    }
2891
2892    @Override
2893    String getOperationType() {
2894      return "CREATE_NAMESPACE";
2895    }
2896  }
2897
2898  private static class DeleteNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2899
2900    DeleteNamespaceProcedureBiConsumer(String namespaceName) {
2901      super(namespaceName);
2902    }
2903
2904    @Override
2905    String getOperationType() {
2906      return "DELETE_NAMESPACE";
2907    }
2908  }
2909
2910  private static class ModifyNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
2911
2912    ModifyNamespaceProcedureBiConsumer(String namespaceName) {
2913      super(namespaceName);
2914    }
2915
2916    @Override
2917    String getOperationType() {
2918      return "MODIFY_NAMESPACE";
2919    }
2920  }
2921
2922  private static class MergeTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2923
2924    MergeTableRegionProcedureBiConsumer(TableName tableName) {
2925      super(tableName);
2926    }
2927
2928    @Override
2929    String getOperationType() {
2930      return "MERGE_REGIONS";
2931    }
2932  }
2933
2934  private static class SplitTableRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2935
2936    SplitTableRegionProcedureBiConsumer(TableName tableName) {
2937      super(tableName);
2938    }
2939
2940    @Override
2941    String getOperationType() {
2942      return "SPLIT_REGION";
2943    }
2944  }
2945
2946  private static class TruncateRegionProcedureBiConsumer extends TableProcedureBiConsumer {
2947
2948    TruncateRegionProcedureBiConsumer(TableName tableName) {
2949      super(tableName);
2950    }
2951
2952    @Override
2953    String getOperationType() {
2954      return "TRUNCATE_REGION";
2955    }
2956  }
2957
2958  private static class SnapshotProcedureBiConsumer extends TableProcedureBiConsumer {
2959    SnapshotProcedureBiConsumer(TableName tableName) {
2960      super(tableName);
2961    }
2962
2963    @Override
2964    String getOperationType() {
2965      return "SNAPSHOT";
2966    }
2967  }
2968
2969  private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer {
2970    private final String peerId;
2971    private final Supplier<String> getOperation;
2972
2973    ReplicationProcedureBiConsumer(String peerId, Supplier<String> getOperation) {
2974      this.peerId = peerId;
2975      this.getOperation = getOperation;
2976    }
2977
2978    String getDescription() {
2979      return "Operation: " + getOperation.get() + ", peerId: " + peerId;
2980    }
2981
2982    @Override
2983    void onFinished() {
2984      LOG.info(getDescription() + " completed");
2985    }
2986
2987    @Override
2988    void onError(Throwable error) {
2989      LOG.info(getDescription() + " failed with " + error.getMessage());
2990    }
2991  }
2992
2993  private CompletableFuture<Void> waitProcedureResult(CompletableFuture<Long> procFuture) {
2994    CompletableFuture<Void> future = new CompletableFuture<>();
2995    addListener(procFuture, (procId, error) -> {
2996      if (error != null) {
2997        future.completeExceptionally(error);
2998        return;
2999      }
3000      getProcedureResult(procId, future, 0);
3001    });
3002    return future;
3003  }
3004
3005  private void getProcedureResult(long procId, CompletableFuture<Void> future, int retries) {
3006    addListener(
3007      this.<GetProcedureResultResponse> newMasterCaller()
3008        .action((controller, stub) -> this.<GetProcedureResultRequest, GetProcedureResultResponse,
3009          GetProcedureResultResponse> call(controller, stub,
3010            GetProcedureResultRequest.newBuilder().setProcId(procId).build(),
3011            (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp))
3012        .call(),
3013      (response, error) -> {
3014        if (error != null) {
3015          LOG.warn("failed to get the procedure result procId={}", procId,
3016            ConnectionUtils.translateException(error));
3017          retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
3018            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
3019          return;
3020        }
3021        if (response.getState() == GetProcedureResultResponse.State.RUNNING) {
3022          retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1),
3023            ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
3024          return;
3025        }
3026        if (response.hasException()) {
3027          IOException ioe = ForeignExceptionUtil.toIOException(response.getException());
3028          future.completeExceptionally(ioe);
3029        } else {
3030          future.complete(null);
3031        }
3032      });
3033  }
3034
3035  private <T> CompletableFuture<T> failedFuture(Throwable error) {
3036    CompletableFuture<T> future = new CompletableFuture<>();
3037    future.completeExceptionally(error);
3038    return future;
3039  }
3040
3041  private <T> boolean completeExceptionally(CompletableFuture<T> future, Throwable error) {
3042    if (error != null) {
3043      future.completeExceptionally(error);
3044      return true;
3045    }
3046    return false;
3047  }
3048
3049  @Override
3050  public CompletableFuture<ClusterMetrics> getClusterMetrics() {
3051    return getClusterMetrics(EnumSet.allOf(Option.class));
3052  }
3053
3054  @Override
3055  public CompletableFuture<ClusterMetrics> getClusterMetrics(EnumSet<Option> options) {
3056    return this.<ClusterMetrics> newMasterCaller()
3057      .action((controller, stub) -> this.<GetClusterStatusRequest, GetClusterStatusResponse,
3058        ClusterMetrics> call(controller, stub,
3059          RequestConverter.buildGetClusterStatusRequest(options),
3060          (s, c, req, done) -> s.getClusterStatus(c, req, done),
3061          resp -> ClusterMetricsBuilder.toClusterMetrics(resp.getClusterStatus())))
3062      .call();
3063  }
3064
3065  @Override
3066  public CompletableFuture<Void> shutdown() {
3067    return this.<Void> newMasterCaller().priority(HIGH_QOS)
3068      .action((controller, stub) -> this.<ShutdownRequest, ShutdownResponse, Void> call(controller,
3069        stub, ShutdownRequest.newBuilder().build(), (s, c, req, done) -> s.shutdown(c, req, done),
3070        resp -> null))
3071      .call();
3072  }
3073
3074  @Override
3075  public CompletableFuture<Void> stopMaster() {
3076    return this.<Void> newMasterCaller().priority(HIGH_QOS)
3077      .action((controller, stub) -> this.<StopMasterRequest, StopMasterResponse, Void> call(
3078        controller, stub, StopMasterRequest.newBuilder().build(),
3079        (s, c, req, done) -> s.stopMaster(c, req, done), resp -> null))
3080      .call();
3081  }
3082
3083  @Override
3084  public CompletableFuture<Void> stopRegionServer(ServerName serverName) {
3085    StopServerRequest request = RequestConverter
3086      .buildStopServerRequest("Called by admin client " + this.connection.toString());
3087    return this.<Void> newAdminCaller().priority(HIGH_QOS)
3088      .action((controller, stub) -> this.<StopServerRequest, StopServerResponse, Void> adminCall(
3089        controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done),
3090        resp -> null))
3091      .serverName(serverName).call();
3092  }
3093
3094  @Override
3095  public CompletableFuture<Void> updateConfiguration(ServerName serverName) {
3096    return this.<Void> newAdminCaller()
3097      .action((controller, stub) -> this.<UpdateConfigurationRequest, UpdateConfigurationResponse,
3098        Void> adminCall(controller, stub, UpdateConfigurationRequest.getDefaultInstance(),
3099          (s, c, req, done) -> s.updateConfiguration(controller, req, done), resp -> null))
3100      .serverName(serverName).call();
3101  }
3102
3103  @Override
3104  public CompletableFuture<Void> updateConfiguration() {
3105    CompletableFuture<Void> future = new CompletableFuture<Void>();
3106    addListener(
3107      getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.MASTER, Option.BACKUP_MASTERS)),
3108      (status, err) -> {
3109        if (err != null) {
3110          future.completeExceptionally(err);
3111        } else {
3112          List<CompletableFuture<Void>> futures = new ArrayList<>();
3113          status.getServersName().forEach(server -> futures.add(updateConfiguration(server)));
3114          futures.add(updateConfiguration(status.getMasterName()));
3115          status.getBackupMasterNames().forEach(master -> futures.add(updateConfiguration(master)));
3116          addListener(
3117            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3118            (result, err2) -> {
3119              if (err2 != null) {
3120                future.completeExceptionally(err2);
3121              } else {
3122                future.complete(result);
3123              }
3124            });
3125        }
3126      });
3127    return future;
3128  }
3129
3130  @Override
3131  public CompletableFuture<Void> updateConfiguration(String groupName) {
3132    CompletableFuture<Void> future = new CompletableFuture<Void>();
3133    addListener(getRSGroup(groupName), (rsGroupInfo, err) -> {
3134      if (err != null) {
3135        future.completeExceptionally(err);
3136      } else if (rsGroupInfo == null) {
3137        future.completeExceptionally(
3138          new IllegalArgumentException("Group does not exist: " + groupName));
3139      } else {
3140        addListener(getClusterMetrics(EnumSet.of(Option.SERVERS_NAME)), (status, err2) -> {
3141          if (err2 != null) {
3142            future.completeExceptionally(err2);
3143          } else {
3144            List<CompletableFuture<Void>> futures = new ArrayList<>();
3145            List<ServerName> groupServers = status.getServersName().stream()
3146              .filter(s -> rsGroupInfo.containsServer(s.getAddress())).collect(Collectors.toList());
3147            groupServers.forEach(server -> futures.add(updateConfiguration(server)));
3148            addListener(
3149              CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3150              (result, err3) -> {
3151                if (err3 != null) {
3152                  future.completeExceptionally(err3);
3153                } else {
3154                  future.complete(result);
3155                }
3156              });
3157          }
3158        });
3159      }
3160    });
3161    return future;
3162  }
3163
3164  @Override
3165  public CompletableFuture<Void> rollWALWriter(ServerName serverName) {
3166    return this.<Void> newAdminCaller()
3167      .action((controller, stub) -> this.<RollWALWriterRequest, RollWALWriterResponse,
3168        Void> adminCall(controller, stub, RequestConverter.buildRollWALWriterRequest(),
3169          (s, c, req, done) -> s.rollWALWriter(controller, req, done), resp -> null))
3170      .serverName(serverName).call();
3171  }
3172
3173  @Override
3174  public CompletableFuture<Void> clearCompactionQueues(ServerName serverName, Set<String> queues) {
3175    return this.<Void> newAdminCaller()
3176      .action((controller, stub) -> this.<ClearCompactionQueuesRequest,
3177        ClearCompactionQueuesResponse, Void> adminCall(controller, stub,
3178          RequestConverter.buildClearCompactionQueuesRequest(queues),
3179          (s, c, req, done) -> s.clearCompactionQueues(controller, req, done), resp -> null))
3180      .serverName(serverName).call();
3181  }
3182
3183  @Override
3184  public CompletableFuture<List<SecurityCapability>> getSecurityCapabilities() {
3185    return this.<List<SecurityCapability>> newMasterCaller()
3186      .action((controller, stub) -> this.<SecurityCapabilitiesRequest, SecurityCapabilitiesResponse,
3187        List<SecurityCapability>> call(controller, stub,
3188          SecurityCapabilitiesRequest.newBuilder().build(),
3189          (s, c, req, done) -> s.getSecurityCapabilities(c, req, done),
3190          (resp) -> ProtobufUtil.toSecurityCapabilityList(resp.getCapabilitiesList())))
3191      .call();
3192  }
3193
3194  @Override
3195  public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName) {
3196    return getRegionMetrics(GetRegionLoadRequest.newBuilder().build(), serverName);
3197  }
3198
3199  @Override
3200  public CompletableFuture<List<RegionMetrics>> getRegionMetrics(ServerName serverName,
3201    TableName tableName) {
3202    Preconditions.checkNotNull(tableName,
3203      "tableName is null. If you don't specify a tableName, use getRegionLoads() instead");
3204    return getRegionMetrics(RequestConverter.buildGetRegionLoadRequest(tableName), serverName);
3205  }
3206
3207  private CompletableFuture<List<RegionMetrics>> getRegionMetrics(GetRegionLoadRequest request,
3208    ServerName serverName) {
3209    return this.<List<RegionMetrics>> newAdminCaller()
3210      .action((controller, stub) -> this.<GetRegionLoadRequest, GetRegionLoadResponse,
3211        List<RegionMetrics>> adminCall(controller, stub, request,
3212          (s, c, req, done) -> s.getRegionLoad(controller, req, done),
3213          RegionMetricsBuilder::toRegionMetrics))
3214      .serverName(serverName).call();
3215  }
3216
3217  @Override
3218  public CompletableFuture<Boolean> isMasterInMaintenanceMode() {
3219    return this.<Boolean> newMasterCaller()
3220      .action((controller, stub) -> this.<IsInMaintenanceModeRequest, IsInMaintenanceModeResponse,
3221        Boolean> call(controller, stub, IsInMaintenanceModeRequest.newBuilder().build(),
3222          (s, c, req, done) -> s.isMasterInMaintenanceMode(c, req, done),
3223          resp -> resp.getInMaintenanceMode()))
3224      .call();
3225  }
3226
3227  @Override
3228  public CompletableFuture<CompactionState> getCompactionState(TableName tableName,
3229    CompactType compactType) {
3230    CompletableFuture<CompactionState> future = new CompletableFuture<>();
3231
3232    switch (compactType) {
3233      case MOB:
3234        addListener(connection.registry.getActiveMaster(), (serverName, err) -> {
3235          if (err != null) {
3236            future.completeExceptionally(err);
3237            return;
3238          }
3239          RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
3240
3241          addListener(this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName)
3242            .action((controller, stub) -> this.<GetRegionInfoRequest, GetRegionInfoResponse,
3243              GetRegionInfoResponse> adminCall(controller, stub,
3244                RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true),
3245                (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
3246            .call(), (resp2, err2) -> {
3247              if (err2 != null) {
3248                future.completeExceptionally(err2);
3249              } else {
3250                if (resp2.hasCompactionState()) {
3251                  future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
3252                } else {
3253                  future.complete(CompactionState.NONE);
3254                }
3255              }
3256            });
3257        });
3258        break;
3259      case NORMAL:
3260        addListener(getTableHRegionLocations(tableName), (locations, err) -> {
3261          if (err != null) {
3262            future.completeExceptionally(err);
3263            return;
3264          }
3265          ConcurrentLinkedQueue<CompactionState> regionStates = new ConcurrentLinkedQueue<>();
3266          List<CompletableFuture<CompactionState>> futures = new ArrayList<>();
3267          locations.stream().filter(loc -> loc.getServerName() != null)
3268            .filter(loc -> loc.getRegion() != null).filter(loc -> !loc.getRegion().isOffline())
3269            .map(loc -> loc.getRegion().getRegionName()).forEach(region -> {
3270              futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> {
3271                // If any region compaction state is MAJOR_AND_MINOR
3272                // the table compaction state is MAJOR_AND_MINOR, too.
3273                if (err2 != null) {
3274                  future.completeExceptionally(unwrapCompletionException(err2));
3275                } else if (regionState == CompactionState.MAJOR_AND_MINOR) {
3276                  future.complete(regionState);
3277                } else {
3278                  regionStates.add(regionState);
3279                }
3280              }));
3281            });
3282          addListener(
3283            CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3284            (ret, err3) -> {
3285              // If future not completed, check all regions's compaction state
3286              if (!future.isCompletedExceptionally() && !future.isDone()) {
3287                CompactionState state = CompactionState.NONE;
3288                for (CompactionState regionState : regionStates) {
3289                  switch (regionState) {
3290                    case MAJOR:
3291                      if (state == CompactionState.MINOR) {
3292                        future.complete(CompactionState.MAJOR_AND_MINOR);
3293                      } else {
3294                        state = CompactionState.MAJOR;
3295                      }
3296                      break;
3297                    case MINOR:
3298                      if (state == CompactionState.MAJOR) {
3299                        future.complete(CompactionState.MAJOR_AND_MINOR);
3300                      } else {
3301                        state = CompactionState.MINOR;
3302                      }
3303                      break;
3304                    case NONE:
3305                    default:
3306                  }
3307                }
3308                if (!future.isDone()) {
3309                  future.complete(state);
3310                }
3311              }
3312            });
3313        });
3314        break;
3315      default:
3316        throw new IllegalArgumentException("Unknown compactType: " + compactType);
3317    }
3318
3319    return future;
3320  }
3321
3322  @Override
3323  public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) {
3324    CompletableFuture<CompactionState> future = new CompletableFuture<>();
3325    addListener(getRegionLocation(regionName), (location, err) -> {
3326      if (err != null) {
3327        future.completeExceptionally(err);
3328        return;
3329      }
3330      ServerName serverName = location.getServerName();
3331      if (serverName == null) {
3332        future
3333          .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
3334        return;
3335      }
3336      addListener(
3337        this.<GetRegionInfoResponse> newAdminCaller()
3338          .action((controller, stub) -> this.<GetRegionInfoRequest, GetRegionInfoResponse,
3339            GetRegionInfoResponse> adminCall(controller, stub,
3340              RequestConverter.buildGetRegionInfoRequest(location.getRegion().getRegionName(),
3341                true),
3342              (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp))
3343          .serverName(serverName).call(),
3344        (resp2, err2) -> {
3345          if (err2 != null) {
3346            future.completeExceptionally(err2);
3347          } else {
3348            if (resp2.hasCompactionState()) {
3349              future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
3350            } else {
3351              future.complete(CompactionState.NONE);
3352            }
3353          }
3354        });
3355    });
3356    return future;
3357  }
3358
3359  @Override
3360  public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName) {
3361    MajorCompactionTimestampRequest request = MajorCompactionTimestampRequest.newBuilder()
3362      .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
3363    return this.<Optional<Long>> newMasterCaller()
3364      .action((controller, stub) -> this.<MajorCompactionTimestampRequest,
3365        MajorCompactionTimestampResponse, Optional<Long>> call(controller, stub, request,
3366          (s, c, req, done) -> s.getLastMajorCompactionTimestamp(c, req, done),
3367          ProtobufUtil::toOptionalTimestamp))
3368      .call();
3369  }
3370
3371  @Override
3372  public CompletableFuture<Optional<Long>>
3373    getLastMajorCompactionTimestampForRegion(byte[] regionName) {
3374    CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
3375    // regionName may be a full region name or encoded region name, so getRegionInfo(byte[]) first
3376    addListener(getRegionInfo(regionName), (region, err) -> {
3377      if (err != null) {
3378        future.completeExceptionally(err);
3379        return;
3380      }
3381      MajorCompactionTimestampForRegionRequest.Builder builder =
3382        MajorCompactionTimestampForRegionRequest.newBuilder();
3383      builder.setRegion(
3384        RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName));
3385      addListener(this.<Optional<Long>> newMasterCaller()
3386        .action((controller, stub) -> this.<MajorCompactionTimestampForRegionRequest,
3387          MajorCompactionTimestampResponse, Optional<Long>> call(controller, stub, builder.build(),
3388            (s, c, req, done) -> s.getLastMajorCompactionTimestampForRegion(c, req, done),
3389            ProtobufUtil::toOptionalTimestamp))
3390        .call(), (timestamp, err2) -> {
3391          if (err2 != null) {
3392            future.completeExceptionally(err2);
3393          } else {
3394            future.complete(timestamp);
3395          }
3396        });
3397    });
3398    return future;
3399  }
3400
3401  @Override
3402  public CompletableFuture<Map<ServerName, Boolean>> compactionSwitch(boolean switchState,
3403    List<String> serverNamesList) {
3404    CompletableFuture<Map<ServerName, Boolean>> future = new CompletableFuture<>();
3405    addListener(getRegionServerList(serverNamesList), (serverNames, err) -> {
3406      if (err != null) {
3407        future.completeExceptionally(err);
3408        return;
3409      }
3410      // Accessed by multiple threads.
3411      Map<ServerName, Boolean> serverStates = new ConcurrentHashMap<>(serverNames.size());
3412      List<CompletableFuture<Boolean>> futures = new ArrayList<>(serverNames.size());
3413      serverNames.stream().forEach(serverName -> {
3414        futures.add(switchCompact(serverName, switchState).whenComplete((serverState, err2) -> {
3415          if (err2 != null) {
3416            future.completeExceptionally(unwrapCompletionException(err2));
3417          } else {
3418            serverStates.put(serverName, serverState);
3419          }
3420        }));
3421      });
3422      addListener(
3423        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3424        (ret, err3) -> {
3425          if (!future.isCompletedExceptionally()) {
3426            if (err3 != null) {
3427              future.completeExceptionally(err3);
3428            } else {
3429              future.complete(serverStates);
3430            }
3431          }
3432        });
3433    });
3434    return future;
3435  }
3436
3437  private CompletableFuture<List<ServerName>> getRegionServerList(List<String> serverNamesList) {
3438    CompletableFuture<List<ServerName>> future = new CompletableFuture<>();
3439    if (serverNamesList.isEmpty()) {
3440      CompletableFuture<ClusterMetrics> clusterMetricsCompletableFuture =
3441        getClusterMetrics(EnumSet.of(Option.SERVERS_NAME));
3442      addListener(clusterMetricsCompletableFuture, (clusterMetrics, err) -> {
3443        if (err != null) {
3444          future.completeExceptionally(err);
3445        } else {
3446          future.complete(clusterMetrics.getServersName());
3447        }
3448      });
3449      return future;
3450    } else {
3451      List<ServerName> serverList = new ArrayList<>();
3452      for (String regionServerName : serverNamesList) {
3453        ServerName serverName = null;
3454        try {
3455          serverName = ServerName.valueOf(regionServerName);
3456        } catch (Exception e) {
3457          future.completeExceptionally(
3458            new IllegalArgumentException(String.format("ServerName format: %s", regionServerName)));
3459        }
3460        if (serverName == null) {
3461          future.completeExceptionally(
3462            new IllegalArgumentException(String.format("Null ServerName: %s", regionServerName)));
3463        } else {
3464          serverList.add(serverName);
3465        }
3466      }
3467      future.complete(serverList);
3468    }
3469    return future;
3470  }
3471
3472  private CompletableFuture<Boolean> switchCompact(ServerName serverName, boolean onOrOff) {
3473    return this.<Boolean> newAdminCaller().serverName(serverName)
3474      .action((controller, stub) -> this.<CompactionSwitchRequest, CompactionSwitchResponse,
3475        Boolean> adminCall(controller, stub,
3476          CompactionSwitchRequest.newBuilder().setEnabled(onOrOff).build(),
3477          (s, c, req, done) -> s.compactionSwitch(c, req, done), resp -> resp.getPrevState()))
3478      .call();
3479  }
3480
3481  @Override
3482  public CompletableFuture<Boolean> balancerSwitch(boolean on, boolean drainRITs) {
3483    return this.<Boolean> newMasterCaller()
3484      .action((controller, stub) -> this.<SetBalancerRunningRequest, SetBalancerRunningResponse,
3485        Boolean> call(controller, stub,
3486          RequestConverter.buildSetBalancerRunningRequest(on, drainRITs),
3487          (s, c, req, done) -> s.setBalancerRunning(c, req, done),
3488          (resp) -> resp.getPrevBalanceValue()))
3489      .call();
3490  }
3491
3492  @Override
3493  public CompletableFuture<BalanceResponse> balance(BalanceRequest request) {
3494    return this.<BalanceResponse> newMasterCaller()
3495      .action((controller, stub) -> this.<MasterProtos.BalanceRequest, MasterProtos.BalanceResponse,
3496        BalanceResponse> call(controller, stub, ProtobufUtil.toBalanceRequest(request),
3497          (s, c, req, done) -> s.balance(c, req, done),
3498          (resp) -> ProtobufUtil.toBalanceResponse(resp)))
3499      .call();
3500  }
3501
3502  @Override
3503  public CompletableFuture<Boolean> isBalancerEnabled() {
3504    return this.<Boolean> newMasterCaller()
3505      .action((controller, stub) -> this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse,
3506        Boolean> call(controller, stub, RequestConverter.buildIsBalancerEnabledRequest(),
3507          (s, c, req, done) -> s.isBalancerEnabled(c, req, done), (resp) -> resp.getEnabled()))
3508      .call();
3509  }
3510
3511  @Override
3512  public CompletableFuture<Boolean> normalizerSwitch(boolean on) {
3513    return this.<Boolean> newMasterCaller()
3514      .action((controller, stub) -> this.<SetNormalizerRunningRequest, SetNormalizerRunningResponse,
3515        Boolean> call(controller, stub, RequestConverter.buildSetNormalizerRunningRequest(on),
3516          (s, c, req, done) -> s.setNormalizerRunning(c, req, done),
3517          (resp) -> resp.getPrevNormalizerValue()))
3518      .call();
3519  }
3520
3521  @Override
3522  public CompletableFuture<Boolean> isNormalizerEnabled() {
3523    return this.<Boolean> newMasterCaller()
3524      .action((controller, stub) -> this.<IsNormalizerEnabledRequest, IsNormalizerEnabledResponse,
3525        Boolean> call(controller, stub, RequestConverter.buildIsNormalizerEnabledRequest(),
3526          (s, c, req, done) -> s.isNormalizerEnabled(c, req, done), (resp) -> resp.getEnabled()))
3527      .call();
3528  }
3529
3530  @Override
3531  public CompletableFuture<Boolean> normalize(NormalizeTableFilterParams ntfp) {
3532    return normalize(RequestConverter.buildNormalizeRequest(ntfp));
3533  }
3534
3535  private CompletableFuture<Boolean> normalize(NormalizeRequest request) {
3536    return this.<Boolean> newMasterCaller().action((controller, stub) -> this.call(controller, stub,
3537      request, MasterService.Interface::normalize, NormalizeResponse::getNormalizerRan)).call();
3538  }
3539
3540  @Override
3541  public CompletableFuture<Boolean> cleanerChoreSwitch(boolean enabled) {
3542    return this.<Boolean> newMasterCaller()
3543      .action((controller, stub) -> this.<SetCleanerChoreRunningRequest,
3544        SetCleanerChoreRunningResponse, Boolean> call(controller, stub,
3545          RequestConverter.buildSetCleanerChoreRunningRequest(enabled),
3546          (s, c, req, done) -> s.setCleanerChoreRunning(c, req, done),
3547          (resp) -> resp.getPrevValue()))
3548      .call();
3549  }
3550
3551  @Override
3552  public CompletableFuture<Boolean> isCleanerChoreEnabled() {
3553    return this.<Boolean> newMasterCaller()
3554      .action(
3555        (controller, stub) -> this.<IsCleanerChoreEnabledRequest, IsCleanerChoreEnabledResponse,
3556          Boolean> call(controller, stub, RequestConverter.buildIsCleanerChoreEnabledRequest(),
3557            (s, c, req, done) -> s.isCleanerChoreEnabled(c, req, done), (resp) -> resp.getValue()))
3558      .call();
3559  }
3560
3561  @Override
3562  public CompletableFuture<Boolean> runCleanerChore() {
3563    return this.<Boolean> newMasterCaller()
3564      .action((controller, stub) -> this.<RunCleanerChoreRequest, RunCleanerChoreResponse,
3565        Boolean> call(controller, stub, RequestConverter.buildRunCleanerChoreRequest(),
3566          (s, c, req, done) -> s.runCleanerChore(c, req, done),
3567          (resp) -> resp.getCleanerChoreRan()))
3568      .call();
3569  }
3570
3571  @Override
3572  public CompletableFuture<Boolean> catalogJanitorSwitch(boolean enabled) {
3573    return this.<Boolean> newMasterCaller()
3574      .action((controller, stub) -> this.<EnableCatalogJanitorRequest, EnableCatalogJanitorResponse,
3575        Boolean> call(controller, stub, RequestConverter.buildEnableCatalogJanitorRequest(enabled),
3576          (s, c, req, done) -> s.enableCatalogJanitor(c, req, done), (resp) -> resp.getPrevValue()))
3577      .call();
3578  }
3579
3580  @Override
3581  public CompletableFuture<Boolean> isCatalogJanitorEnabled() {
3582    return this.<Boolean> newMasterCaller()
3583      .action((controller, stub) -> this.<IsCatalogJanitorEnabledRequest,
3584        IsCatalogJanitorEnabledResponse, Boolean> call(controller, stub,
3585          RequestConverter.buildIsCatalogJanitorEnabledRequest(),
3586          (s, c, req, done) -> s.isCatalogJanitorEnabled(c, req, done), (resp) -> resp.getValue()))
3587      .call();
3588  }
3589
3590  @Override
3591  public CompletableFuture<Integer> runCatalogJanitor() {
3592    return this.<Integer> newMasterCaller()
3593      .action((controller, stub) -> this.<RunCatalogScanRequest, RunCatalogScanResponse,
3594        Integer> call(controller, stub, RequestConverter.buildCatalogScanRequest(),
3595          (s, c, req, done) -> s.runCatalogScan(c, req, done), (resp) -> resp.getScanResult()))
3596      .call();
3597  }
3598
3599  @Override
3600  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3601    ServiceCaller<S, R> callable) {
3602    MasterCoprocessorRpcChannelImpl channel =
3603      new MasterCoprocessorRpcChannelImpl(this.<Message> newMasterCaller());
3604    S stub = stubMaker.apply(channel);
3605    CompletableFuture<R> future = new CompletableFuture<>();
3606    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3607    callable.call(stub, controller, resp -> {
3608      if (controller.failed()) {
3609        future.completeExceptionally(controller.getFailed());
3610      } else {
3611        future.complete(resp);
3612      }
3613    });
3614    return future;
3615  }
3616
3617  @Override
3618  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
3619    ServiceCaller<S, R> callable, ServerName serverName) {
3620    RegionServerCoprocessorRpcChannelImpl channel = new RegionServerCoprocessorRpcChannelImpl(
3621      this.<Message> newServerCaller().serverName(serverName));
3622    S stub = stubMaker.apply(channel);
3623    CompletableFuture<R> future = new CompletableFuture<>();
3624    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
3625    callable.call(stub, controller, resp -> {
3626      if (controller.failed()) {
3627        future.completeExceptionally(controller.getFailed());
3628      } else {
3629        future.complete(resp);
3630      }
3631    });
3632    return future;
3633  }
3634
3635  @Override
3636  public CompletableFuture<List<ServerName>> clearDeadServers(List<ServerName> servers) {
3637    return this.<List<ServerName>> newMasterCaller()
3638      .action((controller, stub) -> this.<ClearDeadServersRequest, ClearDeadServersResponse,
3639        List<ServerName>> call(controller, stub,
3640          RequestConverter.buildClearDeadServersRequest(servers),
3641          (s, c, req, done) -> s.clearDeadServers(c, req, done),
3642          (resp) -> ProtobufUtil.toServerNameList(resp.getServerNameList())))
3643      .call();
3644  }
3645
3646  <T> ServerRequestCallerBuilder<T> newServerCaller() {
3647    return this.connection.callerFactory.<T> serverRequest()
3648      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
3649      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
3650      .pause(pauseNs, TimeUnit.NANOSECONDS)
3651      .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
3652      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
3653  }
3654
3655  @Override
3656  public CompletableFuture<Void> enableTableReplication(TableName tableName) {
3657    if (tableName == null) {
3658      return failedFuture(new IllegalArgumentException("Table name is null"));
3659    }
3660    CompletableFuture<Void> future = new CompletableFuture<>();
3661    addListener(tableExists(tableName), (exist, err) -> {
3662      if (err != null) {
3663        future.completeExceptionally(err);
3664        return;
3665      }
3666      if (!exist) {
3667        future.completeExceptionally(new TableNotFoundException(
3668          "Table '" + tableName.getNameAsString() + "' does not exists."));
3669        return;
3670      }
3671      addListener(getTableSplits(tableName), (splits, err1) -> {
3672        if (err1 != null) {
3673          future.completeExceptionally(err1);
3674        } else {
3675          addListener(checkAndSyncTableToPeerClusters(tableName, splits), (result, err2) -> {
3676            if (err2 != null) {
3677              future.completeExceptionally(err2);
3678            } else {
3679              addListener(setTableReplication(tableName, true), (result3, err3) -> {
3680                if (err3 != null) {
3681                  future.completeExceptionally(err3);
3682                } else {
3683                  future.complete(result3);
3684                }
3685              });
3686            }
3687          });
3688        }
3689      });
3690    });
3691    return future;
3692  }
3693
3694  @Override
3695  public CompletableFuture<Void> disableTableReplication(TableName tableName) {
3696    if (tableName == null) {
3697      return failedFuture(new IllegalArgumentException("Table name is null"));
3698    }
3699    CompletableFuture<Void> future = new CompletableFuture<>();
3700    addListener(tableExists(tableName), (exist, err) -> {
3701      if (err != null) {
3702        future.completeExceptionally(err);
3703        return;
3704      }
3705      if (!exist) {
3706        future.completeExceptionally(new TableNotFoundException(
3707          "Table '" + tableName.getNameAsString() + "' does not exists."));
3708        return;
3709      }
3710      addListener(setTableReplication(tableName, false), (result, err2) -> {
3711        if (err2 != null) {
3712          future.completeExceptionally(err2);
3713        } else {
3714          future.complete(result);
3715        }
3716      });
3717    });
3718    return future;
3719  }
3720
3721  private CompletableFuture<byte[][]> getTableSplits(TableName tableName) {
3722    CompletableFuture<byte[][]> future = new CompletableFuture<>();
3723    addListener(
3724      getRegions(tableName).thenApply(regions -> regions.stream()
3725        .filter(RegionReplicaUtil::isDefaultReplica).collect(Collectors.toList())),
3726      (regions, err2) -> {
3727        if (err2 != null) {
3728          future.completeExceptionally(err2);
3729          return;
3730        }
3731        if (regions.size() == 1) {
3732          future.complete(null);
3733        } else {
3734          byte[][] splits = new byte[regions.size() - 1][];
3735          for (int i = 1; i < regions.size(); i++) {
3736            splits[i - 1] = regions.get(i).getStartKey();
3737          }
3738          future.complete(splits);
3739        }
3740      });
3741    return future;
3742  }
3743
3744  /**
3745   * Connect to peer and check the table descriptor on peer:
3746   * <ol>
3747   * <li>Create the same table on peer when not exist.</li>
3748   * <li>Throw an exception if the table already has replication enabled on any of the column
3749   * families.</li>
3750   * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li>
3751   * </ol>
3752   * @param tableName name of the table to sync to the peer
3753   * @param splits    table split keys
3754   */
3755  private CompletableFuture<Void> checkAndSyncTableToPeerClusters(TableName tableName,
3756    byte[][] splits) {
3757    CompletableFuture<Void> future = new CompletableFuture<>();
3758    addListener(listReplicationPeers(), (peers, err) -> {
3759      if (err != null) {
3760        future.completeExceptionally(err);
3761        return;
3762      }
3763      if (peers == null || peers.size() <= 0) {
3764        future.completeExceptionally(
3765          new IllegalArgumentException("Found no peer cluster for replication."));
3766        return;
3767      }
3768      List<CompletableFuture<Void>> futures = new ArrayList<>();
3769      peers.stream().filter(peer -> peer.getPeerConfig().needToReplicate(tableName))
3770        .forEach(peer -> {
3771          futures.add(trySyncTableToPeerCluster(tableName, splits, peer));
3772        });
3773      addListener(
3774        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])),
3775        (result, err2) -> {
3776          if (err2 != null) {
3777            future.completeExceptionally(err2);
3778          } else {
3779            future.complete(result);
3780          }
3781        });
3782    });
3783    return future;
3784  }
3785
3786  private CompletableFuture<Void> trySyncTableToPeerCluster(TableName tableName, byte[][] splits,
3787    ReplicationPeerDescription peer) {
3788    Configuration peerConf = null;
3789    try {
3790      peerConf =
3791        ReplicationPeerConfigUtil.getPeerClusterConfiguration(connection.getConfiguration(), peer);
3792    } catch (IOException e) {
3793      return failedFuture(e);
3794    }
3795    CompletableFuture<Void> future = new CompletableFuture<>();
3796    addListener(ConnectionFactory.createAsyncConnection(peerConf), (conn, err) -> {
3797      if (err != null) {
3798        future.completeExceptionally(err);
3799        return;
3800      }
3801      addListener(getDescriptor(tableName), (tableDesc, err1) -> {
3802        if (err1 != null) {
3803          future.completeExceptionally(err1);
3804          return;
3805        }
3806        AsyncAdmin peerAdmin = conn.getAdmin();
3807        addListener(peerAdmin.tableExists(tableName), (exist, err2) -> {
3808          if (err2 != null) {
3809            future.completeExceptionally(err2);
3810            return;
3811          }
3812          if (!exist) {
3813            CompletableFuture<Void> createTableFuture = null;
3814            if (splits == null) {
3815              createTableFuture = peerAdmin.createTable(tableDesc);
3816            } else {
3817              createTableFuture = peerAdmin.createTable(tableDesc, splits);
3818            }
3819            addListener(createTableFuture, (result, err3) -> {
3820              if (err3 != null) {
3821                future.completeExceptionally(err3);
3822              } else {
3823                future.complete(result);
3824              }
3825            });
3826          } else {
3827            addListener(compareTableWithPeerCluster(tableName, tableDesc, peer, peerAdmin),
3828              (result, err4) -> {
3829                if (err4 != null) {
3830                  future.completeExceptionally(err4);
3831                } else {
3832                  future.complete(result);
3833                }
3834              });
3835          }
3836        });
3837      });
3838    });
3839    return future;
3840  }
3841
3842  private CompletableFuture<Void> compareTableWithPeerCluster(TableName tableName,
3843    TableDescriptor tableDesc, ReplicationPeerDescription peer, AsyncAdmin peerAdmin) {
3844    CompletableFuture<Void> future = new CompletableFuture<>();
3845    addListener(peerAdmin.getDescriptor(tableName), (peerTableDesc, err) -> {
3846      if (err != null) {
3847        future.completeExceptionally(err);
3848        return;
3849      }
3850      if (peerTableDesc == null) {
3851        future.completeExceptionally(
3852          new IllegalArgumentException("Failed to get table descriptor for table "
3853            + tableName.getNameAsString() + " from peer cluster " + peer.getPeerId()));
3854        return;
3855      }
3856      if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc, tableDesc) != 0) {
3857        future.completeExceptionally(new IllegalArgumentException(
3858          "Table " + tableName.getNameAsString() + " exists in peer cluster " + peer.getPeerId()
3859            + ", but the table descriptors are not same when compared with source cluster."
3860            + " Thus can not enable the table's replication switch."));
3861        return;
3862      }
3863      future.complete(null);
3864    });
3865    return future;
3866  }
3867
3868  /**
3869   * Set the table's replication switch if the table's replication switch is already not set.
3870   * @param tableName name of the table
3871   * @param enableRep is replication switch enable or disable
3872   */
3873  private CompletableFuture<Void> setTableReplication(TableName tableName, boolean enableRep) {
3874    CompletableFuture<Void> future = new CompletableFuture<>();
3875    addListener(getDescriptor(tableName), (tableDesc, err) -> {
3876      if (err != null) {
3877        future.completeExceptionally(err);
3878        return;
3879      }
3880      if (!tableDesc.matchReplicationScope(enableRep)) {
3881        int scope =
3882          enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL;
3883        TableDescriptor newTableDesc =
3884          TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build();
3885        addListener(modifyTable(newTableDesc), (result, err2) -> {
3886          if (err2 != null) {
3887            future.completeExceptionally(err2);
3888          } else {
3889            future.complete(result);
3890          }
3891        });
3892      } else {
3893        future.complete(null);
3894      }
3895    });
3896    return future;
3897  }
3898
3899  @Override
3900  public CompletableFuture<Boolean> isReplicationPeerEnabled(String peerId) {
3901    GetReplicationPeerStateRequest.Builder request = GetReplicationPeerStateRequest.newBuilder();
3902    request.setPeerId(peerId);
3903    return this.<Boolean> newMasterCaller()
3904      .action((controller, stub) -> this.<GetReplicationPeerStateRequest,
3905        GetReplicationPeerStateResponse, Boolean> call(controller, stub, request.build(),
3906          (s, c, req, done) -> s.isReplicationPeerEnabled(c, req, done),
3907          resp -> resp.getIsEnabled()))
3908      .call();
3909  }
3910
3911  private void waitUntilAllReplicationPeerModificationProceduresDone(
3912    CompletableFuture<Boolean> future, boolean prevOn, int retries) {
3913    CompletableFuture<List<ProcedureProtos.Procedure>> callFuture =
3914      this.<List<ProcedureProtos.Procedure>> newMasterCaller()
3915        .action((controller, stub) -> this.<GetReplicationPeerModificationProceduresRequest,
3916          GetReplicationPeerModificationProceduresResponse, List<ProcedureProtos.Procedure>> call(
3917            controller, stub, GetReplicationPeerModificationProceduresRequest.getDefaultInstance(),
3918            (s, c, req, done) -> s.getReplicationPeerModificationProcedures(c, req, done),
3919            resp -> resp.getProcedureList()))
3920        .call();
3921    addListener(callFuture, (r, e) -> {
3922      if (e != null) {
3923        future.completeExceptionally(e);
3924      } else if (r.isEmpty()) {
3925        // we are done
3926        future.complete(prevOn);
3927      } else {
3928        // retry later to see if the procedures are done
3929        retryTimer.newTimeout(
3930          t -> waitUntilAllReplicationPeerModificationProceduresDone(future, prevOn, retries + 1),
3931          ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
3932      }
3933    });
3934  }
3935
3936  @Override
3937  public CompletableFuture<Boolean> replicationPeerModificationSwitch(boolean on,
3938    boolean drainProcedures) {
3939    ReplicationPeerModificationSwitchRequest request =
3940      ReplicationPeerModificationSwitchRequest.newBuilder().setOn(on).build();
3941    CompletableFuture<Boolean> callFuture = this.<Boolean> newMasterCaller()
3942      .action((controller, stub) -> this.<ReplicationPeerModificationSwitchRequest,
3943        ReplicationPeerModificationSwitchResponse, Boolean> call(controller, stub, request,
3944          (s, c, req, done) -> s.replicationPeerModificationSwitch(c, req, done),
3945          resp -> resp.getPreviousValue()))
3946      .call();
3947    // if we do not need to wait all previous peer modification procedure done, or we are enabling
3948    // peer modification, just return here.
3949    if (!drainProcedures || on) {
3950      return callFuture;
3951    }
3952    // otherwise we need to wait until all previous peer modification procedure done
3953    CompletableFuture<Boolean> future = new CompletableFuture<>();
3954    addListener(callFuture, (prevOn, err) -> {
3955      if (err != null) {
3956        future.completeExceptionally(err);
3957        return;
3958      }
3959      // even if the previous state is disabled, we still need to wait here, as there could be
3960      // another client thread which called this method just before us and have already changed the
3961      // state to off, but there are still peer modification procedures not finished, so we should
3962      // also wait here.
3963      waitUntilAllReplicationPeerModificationProceduresDone(future, prevOn, 0);
3964    });
3965    return future;
3966  }
3967
3968  @Override
3969  public CompletableFuture<Boolean> isReplicationPeerModificationEnabled() {
3970    return this.<Boolean> newMasterCaller()
3971      .action((controller, stub) -> this.<IsReplicationPeerModificationEnabledRequest,
3972        IsReplicationPeerModificationEnabledResponse, Boolean> call(controller, stub,
3973          IsReplicationPeerModificationEnabledRequest.getDefaultInstance(),
3974          (s, c, req, done) -> s.isReplicationPeerModificationEnabled(c, req, done),
3975          (resp) -> resp.getEnabled()))
3976      .call();
3977  }
3978
3979  @Override
3980  public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) {
3981    CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>();
3982    addListener(getTableHRegionLocations(tableName), (locations, err) -> {
3983      if (err != null) {
3984        future.completeExceptionally(err);
3985        return;
3986      }
3987      Map<ServerName, List<RegionInfo>> regionInfoByServerName =
3988        locations.stream().filter(l -> l.getRegion() != null)
3989          .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
3990          .collect(Collectors.groupingBy(l -> l.getServerName(),
3991            Collectors.mapping(l -> l.getRegion(), Collectors.toList())));
3992      List<CompletableFuture<CacheEvictionStats>> futures = new ArrayList<>();
3993      CacheEvictionStatsAggregator aggregator = new CacheEvictionStatsAggregator();
3994      for (Map.Entry<ServerName, List<RegionInfo>> entry : regionInfoByServerName.entrySet()) {
3995        futures
3996          .add(clearBlockCache(entry.getKey(), entry.getValue()).whenComplete((stats, err2) -> {
3997            if (err2 != null) {
3998              future.completeExceptionally(unwrapCompletionException(err2));
3999            } else {
4000              aggregator.append(stats);
4001            }
4002          }));
4003      }
4004      addListener(CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])),
4005        (ret, err3) -> {
4006          if (err3 != null) {
4007            future.completeExceptionally(unwrapCompletionException(err3));
4008          } else {
4009            future.complete(aggregator.sum());
4010          }
4011        });
4012    });
4013    return future;
4014  }
4015
4016  @Override
4017  public CompletableFuture<Void> cloneTableSchema(TableName tableName, TableName newTableName,
4018    boolean preserveSplits) {
4019    CompletableFuture<Void> future = new CompletableFuture<>();
4020    addListener(tableExists(tableName), (exist, err) -> {
4021      if (err != null) {
4022        future.completeExceptionally(err);
4023        return;
4024      }
4025      if (!exist) {
4026        future.completeExceptionally(new TableNotFoundException(tableName));
4027        return;
4028      }
4029      addListener(tableExists(newTableName), (exist1, err1) -> {
4030        if (err1 != null) {
4031          future.completeExceptionally(err1);
4032          return;
4033        }
4034        if (exist1) {
4035          future.completeExceptionally(new TableExistsException(newTableName));
4036          return;
4037        }
4038        addListener(getDescriptor(tableName), (tableDesc, err2) -> {
4039          if (err2 != null) {
4040            future.completeExceptionally(err2);
4041            return;
4042          }
4043          TableDescriptor newTableDesc = TableDescriptorBuilder.copy(newTableName, tableDesc);
4044          if (preserveSplits) {
4045            addListener(getTableSplits(tableName), (splits, err3) -> {
4046              if (err3 != null) {
4047                future.completeExceptionally(err3);
4048              } else {
4049                addListener(
4050                  splits != null ? createTable(newTableDesc, splits) : createTable(newTableDesc),
4051                  (result, err4) -> {
4052                    if (err4 != null) {
4053                      future.completeExceptionally(err4);
4054                    } else {
4055                      future.complete(result);
4056                    }
4057                  });
4058              }
4059            });
4060          } else {
4061            addListener(createTable(newTableDesc), (result, err5) -> {
4062              if (err5 != null) {
4063                future.completeExceptionally(err5);
4064              } else {
4065                future.complete(result);
4066              }
4067            });
4068          }
4069        });
4070      });
4071    });
4072    return future;
4073  }
4074
4075  private CompletableFuture<CacheEvictionStats> clearBlockCache(ServerName serverName,
4076    List<RegionInfo> hris) {
4077    return this.<CacheEvictionStats> newAdminCaller()
4078      .action((controller, stub) -> this.<ClearRegionBlockCacheRequest,
4079        ClearRegionBlockCacheResponse, CacheEvictionStats> adminCall(controller, stub,
4080          RequestConverter.buildClearRegionBlockCacheRequest(hris),
4081          (s, c, req, done) -> s.clearRegionBlockCache(controller, req, done),
4082          resp -> ProtobufUtil.toCacheEvictionStats(resp.getStats())))
4083      .serverName(serverName).call();
4084  }
4085
4086  @Override
4087  public CompletableFuture<Boolean> switchRpcThrottle(boolean enable) {
4088    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
4089      .action((controller, stub) -> this.<SwitchRpcThrottleRequest, SwitchRpcThrottleResponse,
4090        Boolean> call(controller, stub,
4091          SwitchRpcThrottleRequest.newBuilder().setRpcThrottleEnabled(enable).build(),
4092          (s, c, req, done) -> s.switchRpcThrottle(c, req, done),
4093          resp -> resp.getPreviousRpcThrottleEnabled()))
4094      .call();
4095    return future;
4096  }
4097
4098  @Override
4099  public CompletableFuture<Boolean> isRpcThrottleEnabled() {
4100    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
4101      .action((controller, stub) -> this.<IsRpcThrottleEnabledRequest, IsRpcThrottleEnabledResponse,
4102        Boolean> call(controller, stub, IsRpcThrottleEnabledRequest.newBuilder().build(),
4103          (s, c, req, done) -> s.isRpcThrottleEnabled(c, req, done),
4104          resp -> resp.getRpcThrottleEnabled()))
4105      .call();
4106    return future;
4107  }
4108
4109  @Override
4110  public CompletableFuture<Boolean> exceedThrottleQuotaSwitch(boolean enable) {
4111    CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
4112      .action((controller, stub) -> this.<SwitchExceedThrottleQuotaRequest,
4113        SwitchExceedThrottleQuotaResponse, Boolean> call(controller, stub,
4114          SwitchExceedThrottleQuotaRequest.newBuilder().setExceedThrottleQuotaEnabled(enable)
4115            .build(),
4116          (s, c, req, done) -> s.switchExceedThrottleQuota(c, req, done),
4117          resp -> resp.getPreviousExceedThrottleQuotaEnabled()))
4118      .call();
4119    return future;
4120  }
4121
4122  @Override
4123  public CompletableFuture<Map<TableName, Long>> getSpaceQuotaTableSizes() {
4124    return this.<Map<TableName, Long>> newMasterCaller()
4125      .action((controller, stub) -> this.<GetSpaceQuotaRegionSizesRequest,
4126        GetSpaceQuotaRegionSizesResponse, Map<TableName, Long>> call(controller, stub,
4127          RequestConverter.buildGetSpaceQuotaRegionSizesRequest(),
4128          (s, c, req, done) -> s.getSpaceQuotaRegionSizes(c, req, done),
4129          resp -> resp.getSizesList().stream().collect(Collectors
4130            .toMap(sizes -> ProtobufUtil.toTableName(sizes.getTableName()), RegionSizes::getSize))))
4131      .call();
4132  }
4133
4134  @Override
4135  public CompletableFuture<Map<TableName, SpaceQuotaSnapshot>>
4136    getRegionServerSpaceQuotaSnapshots(ServerName serverName) {
4137    return this.<Map<TableName, SpaceQuotaSnapshot>> newAdminCaller()
4138      .action((controller, stub) -> this.<GetSpaceQuotaSnapshotsRequest,
4139        GetSpaceQuotaSnapshotsResponse, Map<TableName, SpaceQuotaSnapshot>> adminCall(controller,
4140          stub, RequestConverter.buildGetSpaceQuotaSnapshotsRequest(),
4141          (s, c, req, done) -> s.getSpaceQuotaSnapshots(controller, req, done),
4142          resp -> resp.getSnapshotsList().stream()
4143            .collect(Collectors.toMap(snapshot -> ProtobufUtil.toTableName(snapshot.getTableName()),
4144              snapshot -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(snapshot.getSnapshot())))))
4145      .serverName(serverName).call();
4146  }
4147
4148  private CompletableFuture<SpaceQuotaSnapshot>
4149    getCurrentSpaceQuotaSnapshot(Converter<SpaceQuotaSnapshot, GetQuotaStatesResponse> converter) {
4150    return this.<SpaceQuotaSnapshot> newMasterCaller()
4151      .action((controller, stub) -> this.<GetQuotaStatesRequest, GetQuotaStatesResponse,
4152        SpaceQuotaSnapshot> call(controller, stub, RequestConverter.buildGetQuotaStatesRequest(),
4153          (s, c, req, done) -> s.getQuotaStates(c, req, done), converter))
4154      .call();
4155  }
4156
4157  @Override
4158  public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(String namespace) {
4159    return getCurrentSpaceQuotaSnapshot(resp -> resp.getNsSnapshotsList().stream()
4160      .filter(s -> s.getNamespace().equals(namespace)).findFirst()
4161      .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
4162  }
4163
4164  @Override
4165  public CompletableFuture<SpaceQuotaSnapshot> getCurrentSpaceQuotaSnapshot(TableName tableName) {
4166    HBaseProtos.TableName protoTableName = ProtobufUtil.toProtoTableName(tableName);
4167    return getCurrentSpaceQuotaSnapshot(resp -> resp.getTableSnapshotsList().stream()
4168      .filter(s -> s.getTableName().equals(protoTableName)).findFirst()
4169      .map(s -> SpaceQuotaSnapshot.toSpaceQuotaSnapshot(s.getSnapshot())).orElse(null));
4170  }
4171
4172  @Override
4173  public CompletableFuture<Void> grant(UserPermission userPermission,
4174    boolean mergeExistingPermissions) {
4175    return this.<Void> newMasterCaller()
4176      .action((controller, stub) -> this.<GrantRequest, GrantResponse, Void> call(controller, stub,
4177        ShadedAccessControlUtil.buildGrantRequest(userPermission, mergeExistingPermissions),
4178        (s, c, req, done) -> s.grant(c, req, done), resp -> null))
4179      .call();
4180  }
4181
4182  @Override
4183  public CompletableFuture<Void> revoke(UserPermission userPermission) {
4184    return this.<Void> newMasterCaller()
4185      .action((controller, stub) -> this.<RevokeRequest, RevokeResponse, Void> call(controller,
4186        stub, ShadedAccessControlUtil.buildRevokeRequest(userPermission),
4187        (s, c, req, done) -> s.revoke(c, req, done), resp -> null))
4188      .call();
4189  }
4190
4191  @Override
4192  public CompletableFuture<List<UserPermission>>
4193    getUserPermissions(GetUserPermissionsRequest getUserPermissionsRequest) {
4194    return this.<List<UserPermission>> newMasterCaller()
4195      .action((controller, stub) -> this.<AccessControlProtos.GetUserPermissionsRequest,
4196        GetUserPermissionsResponse, List<UserPermission>> call(controller, stub,
4197          ShadedAccessControlUtil.buildGetUserPermissionsRequest(getUserPermissionsRequest),
4198          (s, c, req, done) -> s.getUserPermissions(c, req, done),
4199          resp -> resp.getUserPermissionList().stream()
4200            .map(uPerm -> ShadedAccessControlUtil.toUserPermission(uPerm))
4201            .collect(Collectors.toList())))
4202      .call();
4203  }
4204
4205  @Override
4206  public CompletableFuture<List<Boolean>> hasUserPermissions(String userName,
4207    List<Permission> permissions) {
4208    return this.<List<Boolean>> newMasterCaller()
4209      .action((controller, stub) -> this.<HasUserPermissionsRequest, HasUserPermissionsResponse,
4210        List<Boolean>> call(controller, stub,
4211          ShadedAccessControlUtil.buildHasUserPermissionsRequest(userName, permissions),
4212          (s, c, req, done) -> s.hasUserPermissions(c, req, done),
4213          resp -> resp.getHasUserPermissionList()))
4214      .call();
4215  }
4216
4217  @Override
4218  public CompletableFuture<Boolean> snapshotCleanupSwitch(final boolean on, final boolean sync) {
4219    return this.<Boolean> newMasterCaller()
4220      .action((controller, stub) -> this.call(controller, stub,
4221        RequestConverter.buildSetSnapshotCleanupRequest(on, sync),
4222        MasterService.Interface::switchSnapshotCleanup,
4223        SetSnapshotCleanupResponse::getPrevSnapshotCleanup))
4224      .call();
4225  }
4226
4227  @Override
4228  public CompletableFuture<Boolean> isSnapshotCleanupEnabled() {
4229    return this.<Boolean> newMasterCaller()
4230      .action((controller, stub) -> this.call(controller, stub,
4231        RequestConverter.buildIsSnapshotCleanupEnabledRequest(),
4232        MasterService.Interface::isSnapshotCleanupEnabled,
4233        IsSnapshotCleanupEnabledResponse::getEnabled))
4234      .call();
4235  }
4236
4237  @Override
4238  public CompletableFuture<Void> moveServersToRSGroup(Set<Address> servers, String groupName) {
4239    return this.<Void> newMasterCaller()
4240      .action((controller, stub) -> this.<MoveServersRequest, MoveServersResponse, Void> call(
4241        controller, stub, RequestConverter.buildMoveServersRequest(servers, groupName),
4242        (s, c, req, done) -> s.moveServers(c, req, done), resp -> null))
4243      .call();
4244  }
4245
4246  @Override
4247  public CompletableFuture<Void> addRSGroup(String groupName) {
4248    return this.<Void> newMasterCaller()
4249      .action((controller, stub) -> this.<AddRSGroupRequest, AddRSGroupResponse, Void> call(
4250        controller, stub, AddRSGroupRequest.newBuilder().setRSGroupName(groupName).build(),
4251        (s, c, req, done) -> s.addRSGroup(c, req, done), resp -> null))
4252      .call();
4253  }
4254
4255  @Override
4256  public CompletableFuture<Void> removeRSGroup(String groupName) {
4257    return this.<Void> newMasterCaller()
4258      .action((controller, stub) -> this.<RemoveRSGroupRequest, RemoveRSGroupResponse, Void> call(
4259        controller, stub, RemoveRSGroupRequest.newBuilder().setRSGroupName(groupName).build(),
4260        (s, c, req, done) -> s.removeRSGroup(c, req, done), resp -> null))
4261      .call();
4262  }
4263
4264  @Override
4265  public CompletableFuture<BalanceResponse> balanceRSGroup(String groupName,
4266    BalanceRequest request) {
4267    return this.<BalanceResponse> newMasterCaller()
4268      .action((controller, stub) -> this.<BalanceRSGroupRequest, BalanceRSGroupResponse,
4269        BalanceResponse> call(controller, stub,
4270          ProtobufUtil.createBalanceRSGroupRequest(groupName, request),
4271          MasterService.Interface::balanceRSGroup, ProtobufUtil::toBalanceResponse))
4272      .call();
4273  }
4274
4275  @Override
4276  public CompletableFuture<List<RSGroupInfo>> listRSGroups() {
4277    return this.<List<RSGroupInfo>> newMasterCaller()
4278      .action((controller, stub) -> this.<ListRSGroupInfosRequest, ListRSGroupInfosResponse,
4279        List<RSGroupInfo>> call(controller, stub, ListRSGroupInfosRequest.getDefaultInstance(),
4280          (s, c, req, done) -> s.listRSGroupInfos(c, req, done), resp -> resp.getRSGroupInfoList()
4281            .stream().map(r -> ProtobufUtil.toGroupInfo(r)).collect(Collectors.toList())))
4282      .call();
4283  }
4284
4285  private CompletableFuture<List<LogEntry>> getSlowLogResponses(
4286    final Map<String, Object> filterParams, final Set<ServerName> serverNames, final int limit,
4287    final String logType) {
4288    if (CollectionUtils.isEmpty(serverNames)) {
4289      return CompletableFuture.completedFuture(Collections.emptyList());
4290    }
4291    return CompletableFuture.supplyAsync(() -> serverNames
4292      .stream().map((ServerName serverName) -> getSlowLogResponseFromServer(serverName,
4293        filterParams, limit, logType))
4294      .map(CompletableFuture::join).flatMap(List::stream).collect(Collectors.toList()));
4295  }
4296
4297  private CompletableFuture<List<LogEntry>> getSlowLogResponseFromServer(ServerName serverName,
4298    Map<String, Object> filterParams, int limit, String logType) {
4299    return this.<List<LogEntry>> newAdminCaller()
4300      .action((controller, stub) -> this.adminCall(controller, stub,
4301        RequestConverter.buildSlowLogResponseRequest(filterParams, limit, logType),
4302        AdminService.Interface::getLogEntries, ProtobufUtil::toSlowLogPayloads))
4303      .serverName(serverName).call();
4304  }
4305
4306  @Override
4307  public CompletableFuture<List<Boolean>>
4308    clearSlowLogResponses(@Nullable Set<ServerName> serverNames) {
4309    if (CollectionUtils.isEmpty(serverNames)) {
4310      return CompletableFuture.completedFuture(Collections.emptyList());
4311    }
4312    List<CompletableFuture<Boolean>> clearSlowLogResponseList =
4313      serverNames.stream().map(this::clearSlowLogsResponses).collect(Collectors.toList());
4314    return convertToFutureOfList(clearSlowLogResponseList);
4315  }
4316
4317  private CompletableFuture<Boolean> clearSlowLogsResponses(final ServerName serverName) {
4318    return this.<Boolean> newAdminCaller()
4319      .action((controller, stub) -> this.adminCall(controller, stub,
4320        RequestConverter.buildClearSlowLogResponseRequest(),
4321        AdminService.Interface::clearSlowLogsResponses, ProtobufUtil::toClearSlowLogPayload))
4322      .serverName(serverName).call();
4323  }
4324
4325  private static <T> CompletableFuture<List<T>>
4326    convertToFutureOfList(List<CompletableFuture<T>> futures) {
4327    CompletableFuture<Void> allDoneFuture =
4328      CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
4329    return allDoneFuture
4330      .thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));
4331  }
4332
4333  @Override
4334  public CompletableFuture<List<TableName>> listTablesInRSGroup(String groupName) {
4335    return this.<List<TableName>> newMasterCaller()
4336      .action((controller, stub) -> this.<ListTablesInRSGroupRequest, ListTablesInRSGroupResponse,
4337        List<TableName>> call(controller, stub,
4338          ListTablesInRSGroupRequest.newBuilder().setGroupName(groupName).build(),
4339          (s, c, req, done) -> s.listTablesInRSGroup(c, req, done), resp -> resp.getTableNameList()
4340            .stream().map(ProtobufUtil::toTableName).collect(Collectors.toList())))
4341      .call();
4342  }
4343
4344  @Override
4345  public CompletableFuture<Pair<List<String>, List<TableName>>>
4346    getConfiguredNamespacesAndTablesInRSGroup(String groupName) {
4347    return this.<Pair<List<String>, List<TableName>>> newMasterCaller()
4348      .action((controller, stub) -> this.<GetConfiguredNamespacesAndTablesInRSGroupRequest,
4349        GetConfiguredNamespacesAndTablesInRSGroupResponse,
4350        Pair<List<String>, List<TableName>>> call(controller, stub,
4351          GetConfiguredNamespacesAndTablesInRSGroupRequest.newBuilder().setGroupName(groupName)
4352            .build(),
4353          (s, c, req, done) -> s.getConfiguredNamespacesAndTablesInRSGroup(c, req, done),
4354          resp -> Pair.newPair(resp.getNamespaceList(), resp.getTableNameList().stream()
4355            .map(ProtobufUtil::toTableName).collect(Collectors.toList()))))
4356      .call();
4357  }
4358
4359  @Override
4360  public CompletableFuture<RSGroupInfo> getRSGroup(Address hostPort) {
4361    return this.<RSGroupInfo> newMasterCaller()
4362      .action((controller, stub) -> this.<GetRSGroupInfoOfServerRequest,
4363        GetRSGroupInfoOfServerResponse, RSGroupInfo> call(controller, stub,
4364          GetRSGroupInfoOfServerRequest.newBuilder()
4365            .setServer(HBaseProtos.ServerName.newBuilder().setHostName(hostPort.getHostname())
4366              .setPort(hostPort.getPort()).build())
4367            .build(),
4368          (s, c, req, done) -> s.getRSGroupInfoOfServer(c, req, done),
4369          resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null))
4370      .call();
4371  }
4372
4373  @Override
4374  public CompletableFuture<Void> removeServersFromRSGroup(Set<Address> servers) {
4375    return this.<Void> newMasterCaller()
4376      .action((controller, stub) -> this.<RemoveServersRequest, RemoveServersResponse, Void> call(
4377        controller, stub, RequestConverter.buildRemoveServersRequest(servers),
4378        (s, c, req, done) -> s.removeServers(c, req, done), resp -> null))
4379      .call();
4380  }
4381
4382  @Override
4383  public CompletableFuture<Void> setRSGroup(Set<TableName> tables, String groupName) {
4384    CompletableFuture<Void> future = new CompletableFuture<>();
4385    for (TableName tableName : tables) {
4386      addListener(tableExists(tableName), (exist, err) -> {
4387        if (err != null) {
4388          future.completeExceptionally(err);
4389          return;
4390        }
4391        if (!exist) {
4392          future.completeExceptionally(new TableNotFoundException(tableName));
4393          return;
4394        }
4395      });
4396    }
4397    addListener(listTableDescriptors(new ArrayList<>(tables)), (tableDescriptions, err) -> {
4398      if (err != null) {
4399        future.completeExceptionally(err);
4400        return;
4401      }
4402      if (tableDescriptions == null || tableDescriptions.isEmpty()) {
4403        future.complete(null);
4404        return;
4405      }
4406      List<TableDescriptor> newTableDescriptors = new ArrayList<>();
4407      for (TableDescriptor td : tableDescriptions) {
4408        newTableDescriptors
4409          .add(TableDescriptorBuilder.newBuilder(td).setRegionServerGroup(groupName).build());
4410      }
4411      addListener(
4412        CompletableFuture.allOf(
4413          newTableDescriptors.stream().map(this::modifyTable).toArray(CompletableFuture[]::new)),
4414        (v, e) -> {
4415          if (e != null) {
4416            future.completeExceptionally(e);
4417          } else {
4418            future.complete(v);
4419          }
4420        });
4421    });
4422    return future;
4423  }
4424
4425  @Override
4426  public CompletableFuture<RSGroupInfo> getRSGroup(TableName table) {
4427    return this.<RSGroupInfo> newMasterCaller()
4428      .action((controller, stub) -> this.<GetRSGroupInfoOfTableRequest,
4429        GetRSGroupInfoOfTableResponse, RSGroupInfo> call(controller, stub,
4430          GetRSGroupInfoOfTableRequest.newBuilder()
4431            .setTableName(ProtobufUtil.toProtoTableName(table)).build(),
4432          (s, c, req, done) -> s.getRSGroupInfoOfTable(c, req, done),
4433          resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null))
4434      .call();
4435  }
4436
4437  @Override
4438  public CompletableFuture<RSGroupInfo> getRSGroup(String groupName) {
4439    return this.<RSGroupInfo> newMasterCaller()
4440      .action((controller, stub) -> this.<GetRSGroupInfoRequest, GetRSGroupInfoResponse,
4441        RSGroupInfo> call(controller, stub,
4442          GetRSGroupInfoRequest.newBuilder().setRSGroupName(groupName).build(),
4443          (s, c, req, done) -> s.getRSGroupInfo(c, req, done),
4444          resp -> resp.hasRSGroupInfo() ? ProtobufUtil.toGroupInfo(resp.getRSGroupInfo()) : null))
4445      .call();
4446  }
4447
4448  @Override
4449  public CompletableFuture<Void> renameRSGroup(String oldName, String newName) {
4450    return this.<Void> newMasterCaller()
4451      .action((controller, stub) -> this.<RenameRSGroupRequest, RenameRSGroupResponse, Void> call(
4452        controller, stub, RenameRSGroupRequest.newBuilder().setOldRsgroupName(oldName)
4453          .setNewRsgroupName(newName).build(),
4454        (s, c, req, done) -> s.renameRSGroup(c, req, done), resp -> null))
4455      .call();
4456  }
4457
4458  @Override
4459  public CompletableFuture<Void> updateRSGroupConfig(String groupName,
4460    Map<String, String> configuration) {
4461    UpdateRSGroupConfigRequest.Builder request =
4462      UpdateRSGroupConfigRequest.newBuilder().setGroupName(groupName);
4463    if (configuration != null) {
4464      configuration.entrySet().forEach(e -> request.addConfiguration(
4465        NameStringPair.newBuilder().setName(e.getKey()).setValue(e.getValue()).build()));
4466    }
4467    return this.<Void> newMasterCaller()
4468      .action((controller, stub) -> this.<UpdateRSGroupConfigRequest, UpdateRSGroupConfigResponse,
4469        Void> call(controller, stub, request.build(),
4470          (s, c, req, done) -> s.updateRSGroupConfig(c, req, done), resp -> null))
4471      .call();
4472  }
4473
4474  private CompletableFuture<List<LogEntry>> getBalancerDecisions(final int limit) {
4475    return this.<List<LogEntry>> newMasterCaller()
4476      .action((controller, stub) -> this.call(controller, stub,
4477        ProtobufUtil.toBalancerDecisionRequest(limit), MasterService.Interface::getLogEntries,
4478        ProtobufUtil::toBalancerDecisionResponse))
4479      .call();
4480  }
4481
4482  private CompletableFuture<List<LogEntry>> getBalancerRejections(final int limit) {
4483    return this.<List<LogEntry>> newMasterCaller()
4484      .action((controller, stub) -> this.call(controller, stub,
4485        ProtobufUtil.toBalancerRejectionRequest(limit), MasterService.Interface::getLogEntries,
4486        ProtobufUtil::toBalancerRejectionResponse))
4487      .call();
4488  }
4489
4490  @Override
4491  public CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNames,
4492    String logType, ServerType serverType, int limit, Map<String, Object> filterParams) {
4493    if (logType == null || serverType == null) {
4494      throw new IllegalArgumentException("logType and/or serverType cannot be empty");
4495    }
4496    switch (logType) {
4497      case "SLOW_LOG":
4498      case "LARGE_LOG":
4499        if (ServerType.MASTER.equals(serverType)) {
4500          throw new IllegalArgumentException("Slow/Large logs are not maintained by HMaster");
4501        }
4502        return getSlowLogResponses(filterParams, serverNames, limit, logType);
4503      case "BALANCER_DECISION":
4504        if (ServerType.REGION_SERVER.equals(serverType)) {
4505          throw new IllegalArgumentException(
4506            "Balancer Decision logs are not maintained by HRegionServer");
4507        }
4508        return getBalancerDecisions(limit);
4509      case "BALANCER_REJECTION":
4510        if (ServerType.REGION_SERVER.equals(serverType)) {
4511          throw new IllegalArgumentException(
4512            "Balancer Rejection logs are not maintained by HRegionServer");
4513        }
4514        return getBalancerRejections(limit);
4515      default:
4516        return CompletableFuture.completedFuture(Collections.emptyList());
4517    }
4518  }
4519
4520  @Override
4521  public CompletableFuture<Void> flushMasterStore() {
4522    FlushMasterStoreRequest.Builder request = FlushMasterStoreRequest.newBuilder();
4523    return this.<Void> newMasterCaller()
4524      .action((controller, stub) -> this.<FlushMasterStoreRequest, FlushMasterStoreResponse,
4525        Void> call(controller, stub, request.build(),
4526          (s, c, req, done) -> s.flushMasterStore(c, req, done), resp -> null))
4527      .call();
4528  }
4529
4530  @Override
4531  public CompletableFuture<List<String>> getCachedFilesList(ServerName serverName) {
4532    GetCachedFilesListRequest.Builder request = GetCachedFilesListRequest.newBuilder();
4533    return this.<List<String>> newAdminCaller()
4534      .action((controller, stub) -> this.<GetCachedFilesListRequest, GetCachedFilesListResponse,
4535        List<String>> adminCall(controller, stub, request.build(),
4536          (s, c, req, done) -> s.getCachedFilesList(c, req, done),
4537          resp -> resp.getCachedFilesList()))
4538      .serverName(serverName).call();
4539  }
4540}