001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2.store.region;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.net.InetAddress;
026import java.util.HashSet;
027import java.util.Optional;
028import java.util.Set;
029
030import org.apache.hadoop.hbase.CellScanner;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.client.Get;
033import org.apache.hadoop.hbase.ipc.RpcCall;
034import org.apache.hadoop.hbase.ipc.RpcCallback;
035import org.apache.hadoop.hbase.ipc.RpcServer;
036import org.apache.hadoop.hbase.procedure2.Procedure;
037import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
038import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
039import org.apache.hadoop.hbase.security.User;
040import org.apache.hadoop.hbase.testclassification.MasterTests;
041import org.apache.hadoop.hbase.testclassification.SmallTests;
042import org.apache.hadoop.hbase.util.Bytes;
043import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
044import org.junit.ClassRule;
045import org.junit.Test;
046import org.junit.experimental.categories.Category;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
051import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
052import org.apache.hbase.thirdparty.com.google.protobuf.Message;
053
054import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
055import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
056
057@Category({ MasterTests.class, SmallTests.class })
058public class TestRegionProcedureStore extends RegionProcedureStoreTestBase {
059
060  @ClassRule
061  public static final HBaseClassTestRule CLASS_RULE =
062    HBaseClassTestRule.forClass(TestRegionProcedureStore.class);
063
064  private static final Logger LOG = LoggerFactory.getLogger(TestRegionProcedureStore.class);
065
066  private void verifyProcIdsOnRestart(final Set<Long> procIds) throws Exception {
067    LOG.debug("expected: " + procIds);
068    LoadCounter loader = new LoadCounter();
069    ProcedureTestingUtility.storeRestart(store, true, loader);
070    assertEquals(procIds.size(), loader.getLoadedCount());
071    assertEquals(0, loader.getCorruptedCount());
072  }
073
074  @Test
075  public void testLoad() throws Exception {
076    Set<Long> procIds = new HashSet<>();
077
078    // Insert something in the log
079    RegionProcedureStoreTestProcedure proc1 = new RegionProcedureStoreTestProcedure();
080    procIds.add(proc1.getProcId());
081    store.insert(proc1, null);
082
083    RegionProcedureStoreTestProcedure proc2 = new RegionProcedureStoreTestProcedure();
084    RegionProcedureStoreTestProcedure proc3 = new RegionProcedureStoreTestProcedure();
085    proc3.setParent(proc2);
086    RegionProcedureStoreTestProcedure proc4 = new RegionProcedureStoreTestProcedure();
087    proc4.setParent(proc2);
088
089    procIds.add(proc2.getProcId());
090    procIds.add(proc3.getProcId());
091    procIds.add(proc4.getProcId());
092    store.insert(proc2, new Procedure[] { proc3, proc4 });
093
094    // Verify that everything is there
095    verifyProcIdsOnRestart(procIds);
096
097    // Update and delete something
098    proc1.finish();
099    store.update(proc1);
100    proc4.finish();
101    store.update(proc4);
102    store.delete(proc4.getProcId());
103    procIds.remove(proc4.getProcId());
104
105    // Verify that everything is there
106    verifyProcIdsOnRestart(procIds);
107  }
108
109  @Test
110  public void testCleanup() throws Exception {
111    RegionProcedureStoreTestProcedure proc1 = new RegionProcedureStoreTestProcedure();
112    store.insert(proc1, null);
113    RegionProcedureStoreTestProcedure proc2 = new RegionProcedureStoreTestProcedure();
114    store.insert(proc2, null);
115    RegionProcedureStoreTestProcedure proc3 = new RegionProcedureStoreTestProcedure();
116    store.insert(proc3, null);
117    LoadCounter loader = new LoadCounter();
118    store.load(loader);
119    assertEquals(proc3.getProcId(), loader.getMaxProcId());
120    assertEquals(3, loader.getRunnableCount());
121
122    store.delete(proc3.getProcId());
123    store.delete(proc2.getProcId());
124    loader = new LoadCounter();
125    store.load(loader);
126    assertEquals(proc3.getProcId(), loader.getMaxProcId());
127    assertEquals(1, loader.getRunnableCount());
128
129    // the row should still be there
130    assertTrue(store.region
131      .get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists());
132    assertTrue(store.region
133      .get(new Get(Bytes.toBytes(proc2.getProcId())).setCheckExistenceOnly(true)).getExists());
134
135    // proc2 will be deleted after cleanup, but proc3 should still be there as it holds the max proc
136    // id
137    store.cleanup();
138    assertTrue(store.region
139      .get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists());
140    assertFalse(store.region
141      .get(new Get(Bytes.toBytes(proc2.getProcId())).setCheckExistenceOnly(true)).getExists());
142
143    RegionProcedureStoreTestProcedure proc4 = new RegionProcedureStoreTestProcedure();
144    store.insert(proc4, null);
145    store.cleanup();
146    // proc3 should also be deleted as now proc4 holds the max proc id
147    assertFalse(store.region
148      .get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists());
149  }
150
151  /**
152   * Test for HBASE-23895
153   */
154  @Test
155  public void testInsertWithRpcCall() throws Exception {
156    RpcServer.setCurrentCall(newRpcCallWithDeadline());
157    RegionProcedureStoreTestProcedure proc1 = new RegionProcedureStoreTestProcedure();
158    store.insert(proc1, null);
159    RpcServer.setCurrentCall(null);
160  }
161
162  private RpcCall newRpcCallWithDeadline() {
163    return new RpcCall() {
164      @Override
165      public long getDeadline() {
166        return EnvironmentEdgeManager.currentTime();
167      }
168
169      @Override
170      public BlockingService getService() {
171        return null;
172      }
173
174      @Override
175      public Descriptors.MethodDescriptor getMethod() {
176        return null;
177      }
178
179      @Override
180      public Message getParam() {
181        return null;
182      }
183
184      @Override
185      public CellScanner getCellScanner() {
186        return null;
187      }
188
189      @Override
190      public long getReceiveTime() {
191        return 0;
192      }
193
194      @Override
195      public long getStartTime() {
196        return 0;
197      }
198
199      @Override
200      public void setStartTime(long startTime) {
201
202      }
203
204      @Override
205      public int getTimeout() {
206        return 0;
207      }
208
209      @Override
210      public int getPriority() {
211        return 0;
212      }
213
214      @Override
215      public long getSize() {
216        return 0;
217      }
218
219      @Override
220      public RPCProtos.RequestHeader getHeader() {
221        return null;
222      }
223
224      @Override
225      public int getRemotePort() {
226        return 0;
227      }
228
229      @Override
230      public void setResponse(Message param, CellScanner cells, Throwable errorThrowable,
231        String error) {
232      }
233
234      @Override
235      public void sendResponseIfReady() throws IOException {
236      }
237
238      @Override
239      public void cleanup() {
240      }
241
242      @Override
243      public String toShortString() {
244        return null;
245      }
246
247      @Override
248      public long disconnectSince() {
249        return 0;
250      }
251
252      @Override
253      public boolean isClientCellBlockSupported() {
254        return false;
255      }
256
257      @Override
258      public Optional<User> getRequestUser() {
259        return Optional.empty();
260      }
261
262      @Override
263      public InetAddress getRemoteAddress() {
264        return null;
265      }
266
267      @Override
268      public HBaseProtos.VersionInfo getClientVersionInfo() {
269        return null;
270      }
271
272      @Override
273      public void setCallBack(RpcCallback callback) {
274      }
275
276      @Override
277      public boolean isRetryImmediatelySupported() {
278        return false;
279      }
280
281      @Override
282      public long getResponseCellSize() {
283        return 0;
284      }
285
286      @Override
287      public void incrementResponseCellSize(long cellSize) {
288      }
289
290      @Override
291      public long getResponseBlockSize() {
292        return 0;
293      }
294
295      @Override
296      public void incrementResponseBlockSize(long blockSize) {
297      }
298
299      @Override
300      public long getResponseExceptionSize() {
301        return 0;
302      }
303
304      @Override
305      public void incrementResponseExceptionSize(long exceptionSize) {
306      }
307    };
308  }
309}