001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2.store.region;
019
020import static org.hamcrest.CoreMatchers.startsWith;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertFalse;
024import static org.junit.Assert.fail;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.List;
029import java.util.SortedSet;
030import java.util.TreeSet;
031import org.apache.commons.lang3.mutable.MutableLong;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
037import org.apache.hadoop.hbase.HBaseIOException;
038import org.apache.hadoop.hbase.Server;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.client.RegionInfoBuilder;
041import org.apache.hadoop.hbase.master.assignment.AssignProcedure;
042import org.apache.hadoop.hbase.master.region.MasterRegion;
043import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
044import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
045import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
046import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
047import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureLoader;
048import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
049import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
050import org.apache.hadoop.hbase.testclassification.MasterTests;
051import org.apache.hadoop.hbase.testclassification.SmallTests;
052import org.apache.hadoop.hbase.util.CommonFSUtils;
053import org.junit.After;
054import org.junit.Before;
055import org.junit.ClassRule;
056import org.junit.Test;
057import org.junit.experimental.categories.Category;
058
059@SuppressWarnings("deprecation")
060@Category({ MasterTests.class, SmallTests.class })
061public class TestRegionProcedureStoreMigration {
062
063  @ClassRule
064  public static final HBaseClassTestRule CLASS_RULE =
065    HBaseClassTestRule.forClass(TestRegionProcedureStoreMigration.class);
066
067  private HBaseCommonTestingUtility htu;
068
069  private Server server;
070
071  private MasterRegion region;
072
073  private RegionProcedureStore store;
074
075  private WALProcedureStore walStore;
076
077  @Before
078  public void setUp() throws IOException {
079    htu = new HBaseCommonTestingUtility();
080    Configuration conf = htu.getConfiguration();
081    conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, false);
082    // Runs on local filesystem. Test does not need sync. Turn off checks.
083    conf.setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false);
084    Path testDir = htu.getDataTestDir();
085    CommonFSUtils.setRootDir(conf, testDir);
086    walStore = new WALProcedureStore(conf, new LeaseRecovery() {
087
088      @Override
089      public void recoverFileLease(FileSystem fs, Path path) throws IOException {
090      }
091    });
092    walStore.start(1);
093    walStore.recoverLease();
094    walStore.load(new LoadCounter());
095    server = RegionProcedureStoreTestHelper.mockServer(conf);
096    region = MasterRegionFactory.create(server);
097  }
098
099  @After
100  public void tearDown() throws IOException {
101    if (store != null) {
102      store.stop(true);
103    }
104    region.close(true);
105    walStore.stop(true);
106    htu.cleanupTestDir();
107  }
108
109  @Test
110  public void test() throws IOException {
111    List<RegionProcedureStoreTestProcedure> procs = new ArrayList<>();
112    for (int i = 0; i < 10; i++) {
113      RegionProcedureStoreTestProcedure proc = new RegionProcedureStoreTestProcedure();
114      walStore.insert(proc, null);
115      procs.add(proc);
116    }
117    for (int i = 5; i < 10; i++) {
118      walStore.delete(procs.get(i).getProcId());
119    }
120    walStore.stop(true);
121    SortedSet<RegionProcedureStoreTestProcedure> loadedProcs =
122      new TreeSet<>((p1, p2) -> Long.compare(p1.getProcId(), p2.getProcId()));
123    MutableLong maxProcIdSet = new MutableLong(0);
124    store = RegionProcedureStoreTestHelper.createStore(server, region, new ProcedureLoader() {
125
126      @Override
127      public void setMaxProcId(long maxProcId) {
128        maxProcIdSet.setValue(maxProcId);
129      }
130
131      @Override
132      public void load(ProcedureIterator procIter) throws IOException {
133        while (procIter.hasNext()) {
134          RegionProcedureStoreTestProcedure proc =
135            (RegionProcedureStoreTestProcedure) procIter.next();
136          loadedProcs.add(proc);
137        }
138      }
139
140      @Override
141      public void handleCorrupted(ProcedureIterator procIter) throws IOException {
142        if (procIter.hasNext()) {
143          fail("Found corrupted procedures");
144        }
145      }
146    });
147    assertEquals(10, maxProcIdSet.longValue());
148    assertEquals(5, loadedProcs.size());
149    int procId = 1;
150    for (RegionProcedureStoreTestProcedure proc : loadedProcs) {
151      assertEquals(procId, proc.getProcId());
152      procId++;
153    }
154    Path testDir = htu.getDataTestDir();
155    FileSystem fs = testDir.getFileSystem(htu.getConfiguration());
156    Path oldProcWALDir = new Path(testDir, WALProcedureStore.MASTER_PROCEDURE_LOGDIR);
157    // make sure the old proc wal directory has been deleted.
158    assertFalse(fs.exists(oldProcWALDir));
159  }
160
161  @Test
162  public void testMigrateWithUnsupportedProcedures() throws IOException {
163    AssignProcedure assignProc = new AssignProcedure();
164    assignProc.setProcId(1L);
165    assignProc.setRegionInfo(RegionInfoBuilder.newBuilder(TableName.valueOf("table")).build());
166    walStore.insert(assignProc, null);
167    walStore.stop(true);
168
169    try {
170      store = RegionProcedureStoreTestHelper.createStore(server, region, new LoadCounter());
171      fail("Should fail since AssignProcedure is not supported");
172    } catch (HBaseIOException e) {
173      assertThat(e.getMessage(), startsWith("Unsupported"));
174    }
175  }
176}