001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2.store.region;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.net.InetAddress;
026import java.util.HashSet;
027import java.util.Map;
028import java.util.Optional;
029import java.util.Set;
030import org.apache.hadoop.hbase.CellScanner;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.client.Get;
033import org.apache.hadoop.hbase.ipc.RpcCall;
034import org.apache.hadoop.hbase.ipc.RpcCallback;
035import org.apache.hadoop.hbase.ipc.RpcServer;
036import org.apache.hadoop.hbase.procedure2.Procedure;
037import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
038import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
039import org.apache.hadoop.hbase.security.User;
040import org.apache.hadoop.hbase.testclassification.MasterTests;
041import org.apache.hadoop.hbase.testclassification.SmallTests;
042import org.apache.hadoop.hbase.util.Bytes;
043import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
044import org.junit.ClassRule;
045import org.junit.Test;
046import org.junit.experimental.categories.Category;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
051import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
052import org.apache.hbase.thirdparty.com.google.protobuf.Message;
053
054import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
055import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
056
057@Category({ MasterTests.class, SmallTests.class })
058public class TestRegionProcedureStore extends RegionProcedureStoreTestBase {
059
060  @ClassRule
061  public static final HBaseClassTestRule CLASS_RULE =
062    HBaseClassTestRule.forClass(TestRegionProcedureStore.class);
063
064  private static final Logger LOG = LoggerFactory.getLogger(TestRegionProcedureStore.class);
065
066  private void verifyProcIdsOnRestart(final Set<Long> procIds) throws Exception {
067    LOG.debug("expected: " + procIds);
068    LoadCounter loader = new LoadCounter();
069    ProcedureTestingUtility.storeRestart(store, true, loader);
070    assertEquals(procIds.size(), loader.getLoadedCount());
071    assertEquals(0, loader.getCorruptedCount());
072  }
073
074  @Test
075  public void testLoad() throws Exception {
076    Set<Long> procIds = new HashSet<>();
077
078    // Insert something in the log
079    RegionProcedureStoreTestProcedure proc1 = new RegionProcedureStoreTestProcedure();
080    procIds.add(proc1.getProcId());
081    store.insert(proc1, null);
082
083    RegionProcedureStoreTestProcedure proc2 = new RegionProcedureStoreTestProcedure();
084    RegionProcedureStoreTestProcedure proc3 = new RegionProcedureStoreTestProcedure();
085    proc3.setParent(proc2);
086    RegionProcedureStoreTestProcedure proc4 = new RegionProcedureStoreTestProcedure();
087    proc4.setParent(proc2);
088
089    procIds.add(proc2.getProcId());
090    procIds.add(proc3.getProcId());
091    procIds.add(proc4.getProcId());
092    store.insert(proc2, new Procedure[] { proc3, proc4 });
093
094    // Verify that everything is there
095    verifyProcIdsOnRestart(procIds);
096
097    // Update and delete something
098    proc1.finish();
099    store.update(proc1);
100    proc4.finish();
101    store.update(proc4);
102    store.delete(proc4.getProcId());
103    procIds.remove(proc4.getProcId());
104
105    // Verify that everything is there
106    verifyProcIdsOnRestart(procIds);
107  }
108
109  @Test
110  public void testCleanup() throws Exception {
111    RegionProcedureStoreTestProcedure proc1 = new RegionProcedureStoreTestProcedure();
112    store.insert(proc1, null);
113    RegionProcedureStoreTestProcedure proc2 = new RegionProcedureStoreTestProcedure();
114    store.insert(proc2, null);
115    RegionProcedureStoreTestProcedure proc3 = new RegionProcedureStoreTestProcedure();
116    store.insert(proc3, null);
117    LoadCounter loader = new LoadCounter();
118    store.load(loader);
119    assertEquals(proc3.getProcId(), loader.getMaxProcId());
120    assertEquals(3, loader.getRunnableCount());
121
122    store.delete(proc3.getProcId());
123    store.delete(proc2.getProcId());
124    loader = new LoadCounter();
125    store.load(loader);
126    assertEquals(proc3.getProcId(), loader.getMaxProcId());
127    assertEquals(1, loader.getRunnableCount());
128
129    // the row should still be there
130    assertTrue(store.region
131      .get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists());
132    assertTrue(store.region
133      .get(new Get(Bytes.toBytes(proc2.getProcId())).setCheckExistenceOnly(true)).getExists());
134
135    // proc2 will be deleted after cleanup, but proc3 should still be there as it holds the max proc
136    // id
137    store.cleanup();
138    assertTrue(store.region
139      .get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists());
140    assertFalse(store.region
141      .get(new Get(Bytes.toBytes(proc2.getProcId())).setCheckExistenceOnly(true)).getExists());
142
143    RegionProcedureStoreTestProcedure proc4 = new RegionProcedureStoreTestProcedure();
144    store.insert(proc4, null);
145    store.cleanup();
146    // proc3 should also be deleted as now proc4 holds the max proc id
147    assertFalse(store.region
148      .get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists());
149  }
150
151  /**
152   * Test for HBASE-23895
153   */
154  @Test
155  public void testInsertWithRpcCall() throws Exception {
156    RpcServer.setCurrentCall(newRpcCallWithDeadline());
157    RegionProcedureStoreTestProcedure proc1 = new RegionProcedureStoreTestProcedure();
158    store.insert(proc1, null);
159    RpcServer.setCurrentCall(null);
160  }
161
162  @SuppressWarnings("checkstyle:methodlength")
163  private RpcCall newRpcCallWithDeadline() {
164    return new RpcCall() {
165      @Override
166      public long getDeadline() {
167        return EnvironmentEdgeManager.currentTime();
168      }
169
170      @Override
171      public BlockingService getService() {
172        return null;
173      }
174
175      @Override
176      public Descriptors.MethodDescriptor getMethod() {
177        return null;
178      }
179
180      @Override
181      public Message getParam() {
182        return null;
183      }
184
185      @Override
186      public CellScanner getCellScanner() {
187        return null;
188      }
189
190      @Override
191      public long getReceiveTime() {
192        return 0;
193      }
194
195      @Override
196      public long getStartTime() {
197        return 0;
198      }
199
200      @Override
201      public void setStartTime(long startTime) {
202
203      }
204
205      @Override
206      public int getTimeout() {
207        return 0;
208      }
209
210      @Override
211      public int getPriority() {
212        return 0;
213      }
214
215      @Override
216      public long getSize() {
217        return 0;
218      }
219
220      @Override
221      public RPCProtos.RequestHeader getHeader() {
222        return null;
223      }
224
225      @Override
226      public Map<String, byte[]> getConnectionAttributes() {
227        return null;
228      }
229
230      @Override
231      public Map<String, byte[]> getRequestAttributes() {
232        return null;
233      }
234
235      @Override
236      public byte[] getRequestAttribute(String key) {
237        return null;
238      }
239
240      @Override
241      public int getRemotePort() {
242        return 0;
243      }
244
245      @Override
246      public void setResponse(Message param, CellScanner cells, Throwable errorThrowable,
247        String error) {
248      }
249
250      @Override
251      public void sendResponseIfReady() throws IOException {
252      }
253
254      @Override
255      public void cleanup() {
256      }
257
258      @Override
259      public String toShortString() {
260        return null;
261      }
262
263      @Override
264      public long disconnectSince() {
265        return 0;
266      }
267
268      @Override
269      public boolean isClientCellBlockSupported() {
270        return false;
271      }
272
273      @Override
274      public Optional<User> getRequestUser() {
275        return Optional.empty();
276      }
277
278      @Override
279      public InetAddress getRemoteAddress() {
280        return null;
281      }
282
283      @Override
284      public HBaseProtos.VersionInfo getClientVersionInfo() {
285        return null;
286      }
287
288      @Override
289      public void setCallBack(RpcCallback callback) {
290      }
291
292      @Override
293      public boolean isRetryImmediatelySupported() {
294        return false;
295      }
296
297      @Override
298      public long getResponseCellSize() {
299        return 0;
300      }
301
302      @Override
303      public void incrementResponseCellSize(long cellSize) {
304      }
305
306      @Override
307      public long getBlockBytesScanned() {
308        return 0;
309      }
310
311      @Override
312      public void incrementBlockBytesScanned(long blockSize) {
313      }
314
315      @Override
316      public long getResponseExceptionSize() {
317        return 0;
318      }
319
320      @Override
321      public void incrementResponseExceptionSize(long exceptionSize) {
322      }
323    };
324  }
325}