BerkeleyDB学习笔记

简介

BerkeleyDB(BDB)是一个高效的嵌入式数据库,C、C++、Java、Perl、Python等很多其他语言都有对应的API。Berkeley DB Java Edition (JE) 完全用JAVA写的,它适合于管理海量的,简单的数据。具有如下特点:

  • 能够高效率的处理1到1百万条记录,制约JE数据库的往往是硬件系统,而不是JE本身。
  • 多线程支持,JE使用超时的方式来处理线程间的死琐问题。
  • Database都采用简单的key/value对应的形式。
  • 支持事务。
  • 允许创建二级库,可以方便的使用一级key、二级key来访问数据。
  • 支持RAM缓冲,减少频繁的IO操作。
  • 支持日志。
  • 数据备份和恢复。
  • 支持游标。

BDB(JE)的入门文档请参见 《Getting Started With BerkeleyDB Java Edition》,版本有点老了,但是基本够用,因为API基本未怎么变动。

BDB增删改查的两种方式

下面通过代码来演示BDB的基本操作。

引入BDB(JE)的pom

1
2
3
4
5
<dependency>
<groupId>com.sleepycat</groupId>
<artifactId>je</artifactId>
<version>5.0.73</version>
</dependency>

定义接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import java.util.List;

public interface BerkeleyDBDao<T> {
/**
* open database
* @param filePath 数据库存储路径
* @param dbName 数据库名称
* */

public void openConnection(String filePath, String dbName);
/**
* 关闭数据库
* */

public void closeConnection();
/**
* insert
* */

public void save(String key, T value);
/**
* delete
* */

public void delete(String key);
/**
* update
* */

public void update(String key, T value);
/**
* select one
* */

public T get(String key);
}

基于StoredMap的实现方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import java.io.File;
import java.util.Collections;
import com.sleepycat.bind.EntryBinding;
import com.sleepycat.bind.serial.SerialBinding;
import com.sleepycat.bind.serial.StoredClassCatalog;
import com.sleepycat.collections.StoredMap;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;

/**
* 使用StoredMap实现
**/

public class BerkeleydbDaoSortedMapImpl<T> implements BerkeleydbDao<T> {
private static final String CLASS_CATALOG = "java_class_catalog";

private Environment env = null;
private Database db = null;
private StoredMap<String, T> storedMap = null;
private Class<T> persistentClass = null;


public BerkeleydbDaoSortedMapImpl(Class<T> persistentClass){
this.persistentClass = persistentClass;
}

@Override
public void openConnection(String filePath, String dbName) {
try {
File file = new File(filePath);
if (!file.exists()) {
file.mkdirs();
}
EnvironmentConfig envConf = new EnvironmentConfig();
envConf.setAllowCreate(true);
envConf.setTransactional(true);
this.env = new Environment(file, envConf);

DatabaseConfig dbConf = new DatabaseConfig();
dbConf.setAllowCreate(true);
dbConf.setTransactional(true);
this.db = this.env.openDatabase(null, dbName, dbConf);

//序列化信息持久化
Database classdb = this.env.openDatabase(null, CLASS_CATALOG, dbConf);
StoredClassCatalog catalog = new StoredClassCatalog(classdb);

EntryBinding<String> keyBinding = new SerialBinding<String>(catalog, String.class);
SerialBinding<T> valueBinding = new SerialBinding<T>(catalog, this.persistentClass);
this.storedMap = new StoredMap<String, T>(db, keyBinding, valueBinding, true);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e.getCause());
}
}

@Override
public void closeConnection(){
if (db != null) {
db.close();
}
if (env != null) {
env.cleanLog(); //在关闭环境前,清理日志,用以释放更多的磁盘空间
env.close();
}
}

@Override
public void save(String key, T value) {
storedMap.put(key, value);
}

@Override
public void delete(String key) {
storedMap.remove(key);
}

@Override
public void update(String key, T value) {
save(key, value);
}

@Override
public T get(String key) {
return storedMap.get(key);
}

@Override
public List<T> getList() {
return Collections.list(Collections.enumeration(storedMap.values()));
}
}

基于database的实现方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;

import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;

import dao.BerkeleydbDao;

/**
* 通过database对象直接操作
* */

public class BerkeleydbDatabaseObjectImpl<T> implements BerkeleydbDao<T> {

Environment env = null;
private Database database = null;


@Override
public void openConnection(String filePath, String databaseName) {
File file = new File(filePath);
EnvironmentConfig envConfig = new EnvironmentConfig();
envConfig.setAllowCreate(true);
envConfig.setTransactional(true);
env = new Environment(file, envConfig);
DatabaseConfig databaseConfig = new DatabaseConfig();
databaseConfig.setAllowCreate(true);
databaseConfig.setTransactional(true);
database = env.openDatabase(null, databaseName, databaseConfig);
}

@Override
public void closeConnection() {
if(database != null){
database.close();
if(env != null){
env.cleanLog();
env.close();
}
}
}

@Override
public void delete(String key) {
DatabaseEntry keyEntry = new DatabaseEntry();
keyEntry.setData(key.getBytes());
database.delete(null, keyEntry);
}

@SuppressWarnings("unchecked")
@Override
public T get(String key) {
T t = null;
DatabaseEntry keyEntry = new DatabaseEntry();
DatabaseEntry valueEntry = new DatabaseEntry();
keyEntry.setData(key.getBytes());
if(database.get(null, keyEntry, valueEntry, LockMode.DEFAULT) == OperationStatus.SUCCESS){
ByteArrayInputStream bais = new ByteArrayInputStream(valueEntry.getData());
try {
ObjectInputStream ois = new ObjectInputStream(bais);
t = (T) ois.readObject();
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
return t;
}

@Override
public void save(String key, T t) {
DatabaseEntry keyEntry = new DatabaseEntry();
DatabaseEntry valueEntry = new DatabaseEntry();
keyEntry.setData(key.getBytes());
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(t);
} catch (IOException e) {
e.printStackTrace();
}
valueEntry.setData(baos.toByteArray());
database.put(null, keyEntry, valueEntry);
}

@Override
public void update(String key, T t) {
save(key, t);
}
}

需要强调的是,两种实现方式存储的数据结构是不一样的,故两者存储的数据不能互相兼容使用。总体来看,第一种实现方式简单明了,就像操作Map一样。

BDB二级索引

BDB是使用k-v存储的,默认就是一个key对应一个value,也就是说以key为索引。但是BDB是支持二级索引的,即Secondary Database,这个在官方文档有。不过它没提到的是,BDB支持多个二级索引。下面的代码,演示了如何创建多个二级索引,插入数据,然后通过二级索引查询数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
import com.sleepycat.je.*;
import java.io.File;

public class SimpleSecIndex {
private static final String dataPath = "~/data/";
private static final String mainDbName = "mainDB";
public static final String secDbName1 = "secDB1";
public static final String secDbName2 = "secDB2";
public static final String UTF8 = "UTF-8";

private Database mainDb;
private SecondaryDatabase secDb1, secDb2;
private Environment dbEnv;


/**
* 写数据,最终的数据格式如下:
* 其中1,2,3为primary index,即mainDb的key值。
* 并且有两个二级索引,一个以name为key,另一个以no为key,它们指向的是一级索引的key值。
* <p/>
* data:
* {
* 1, name1, no1, birthday1;
* 2, name2, no2, birthday2;
* 3, name3, no3, birthday3;
* ...
* }
* <p/>
* primary index: 1,2,3...
* sec index1: name1-->1, name2-->2, ...
* sec index2: no1-->1, no2-->2, ...
*/

public void writeData() {
openDBs();
try {

for (int i = 0; i < 100; i++) {
String k = i + "";
DatabaseEntry key = createKeyEntry(k);
DatabaseEntry value = createValueEntry(k);
//不需要调用secDb1和secDb2的put方法,会自动创建索引
mainDb.put(null, key, value);
}
dbEnv.sync();
} catch (Exception ex) {
ex.printStackTrace();
} finally {
closeDBs();
}
System.out.println("Write data OK");
}

/**
* 读数据,分别通过两个索引来查询
*/

public void queryData() {
openDBs();

DatabaseEntry v1 = queryEntry("name50", secDb1);
System.out.println(entryToString(v1));

v1 = queryEntry("no55", secDb2);
System.out.println(entryToString(v1));

closeDBs();
}

/**
* join查询,注意join cursor的多个索引条件之间,是与的关系而不是或,即最后join的时候必须都满足
*/

public void jointQueryData() {
openDBs();

String index1 = "name30";
String index2 = "no30"; //这时会返回一条记录,如果改成no45,则返回纪录为空
Cursor cursor1 = null, cursor2 = null;
JoinCursor joinCursor = null;
try {
cursor1 = secDb1.openCursor(null, null);
cursor2 = secDb2.openCursor(null, null);

DatabaseEntry retKey = new DatabaseEntry();
DatabaseEntry retValue = new DatabaseEntry();

DatabaseEntry key1 = createKeyEntry(index1);
DatabaseEntry key2 = createKeyEntry(index2);

OperationStatus status1 = cursor1.getSearchKey(key1, retValue, LockMode.READ_UNCOMMITTED);
OperationStatus status2 = cursor2.getSearchKey(key2, retValue, LockMode.READ_UNCOMMITTED);
if (status1 == OperationStatus.SUCCESS && status2 == OperationStatus.SUCCESS) {
joinCursor = mainDb.join(new Cursor[]{cursor1, cursor2}, null);
while (joinCursor.getNext(retKey, retValue, LockMode.READ_UNCOMMITTED) == OperationStatus.SUCCESS) {
System.out.println(entryToString(retKey) + ":" + entryToString(retValue));
}
} else {
System.out.println("[null]");
}

} catch (Exception ex) {
ex.printStackTrace();
} finally {
if (cursor1 != null) {
cursor1.close();
}
if (cursor2 != null) {
cursor2.close();
}
if (joinCursor != null) {
joinCursor.close();
}
closeDBs();
}
}

public void openDBs() {
File file = new File(dataPath);
dbEnv = new Environment(file, getEnvConfig());
mainDb = dbEnv.openDatabase(null, mainDbName, getMainDbConfig());
secDb1 = dbEnv.openSecondaryDatabase(null, secDbName1, mainDb, getSecDbConfig(new SecKeyCreator("name")));
secDb2 = dbEnv.openSecondaryDatabase(null, secDbName2, mainDb, getSecDbConfig(new SecKeyCreator("no")));
}

public void closeDBs() {
secDb1.close();
secDb2.close();
mainDb.close();
dbEnv.close();
}

public DatabaseEntry queryEntry(String key, Database db) {
DatabaseEntry keyEntry = createKeyEntry(key);
DatabaseEntry valueEntry = new DatabaseEntry();

OperationStatus status = db.get(null, keyEntry, valueEntry,
LockMode.READ_UNCOMMITTED);
if (status == OperationStatus.SUCCESS)
return valueEntry;
return null;
}


public DatabaseEntry createKeyEntry(String key) {
DatabaseEntry entry = null;
try {
entry = new DatabaseEntry(key.getBytes("UTF-8"));
} catch (Exception ex) {

}
return entry;
}

public DatabaseEntry createValueEntry(String key) {
String data = "name" + key + "," + "no" + key + "," + "birthday" + key;
DatabaseEntry entry = null;
try {
entry = new DatabaseEntry(data.getBytes(UTF8));
} catch (Exception ex) {
ex.printStackTrace();
}
return entry;
}


public String entryToString(DatabaseEntry entry) {
try {
String ret = new String(entry.getData(), UTF8);
return ret;
} catch (Exception ex) {
ex.printStackTrace();
}
return "[null]";
}


private EnvironmentConfig getEnvConfig() {
EnvironmentConfig envConfig = new EnvironmentConfig();
envConfig.setAllowCreate(true);
envConfig.setReadOnly(false);
envConfig.setTransactional(false);
envConfig.setLocking(false);
return envConfig;
}

private DatabaseConfig getMainDbConfig() {
DatabaseConfig dbConfig = new DatabaseConfig();
dbConfig.setAllowCreate(true);
dbConfig.setTransactional(false);
dbConfig.setReadOnly(false);
dbConfig.setDeferredWrite(false);
dbConfig.setSortedDuplicates(false);
return dbConfig;
}

private SecondaryConfig getSecDbConfig(SecondaryKeyCreator keyCreator) {
SecondaryConfig secConfig = new SecondaryConfig();
secConfig.setKeyCreator(keyCreator);
secConfig.setReadOnly(false);
secConfig.setAllowCreate(true);
//这个值为true时,通过二级索引进行查询,会直接把对应key的值取回来
secConfig.setAllowPopulate(true);
secConfig.setSortedDuplicates(true);
return secConfig;
}

/**
* 这个类告诉MainDB如何从它的key/value中创建出二级索引。在这个例子中,我们的值有三个字段name, no, birthday,
* 二级索引为name, no这两个字段。我们只要能够拼出对应记录的这两个字段的值即可。
* 这里没有使用TupleBinding。
*/

class SecKeyCreator implements SecondaryKeyCreator {
private String prefix;

public SecKeyCreator(String prefix) {
this.prefix = prefix;
}

@Override
public boolean createSecondaryKey(SecondaryDatabase secondary, DatabaseEntry key, DatabaseEntry data, DatabaseEntry result) {
try {
String k = prefix + new String(key.getData(), UTF8);
result.setData(k.getBytes(UTF8));
} catch (Exception ex) {
}
return true;
}
}

public static void main(String[] args) {
SimpleSecIndex instance = new SimpleSecIndex();
instance.writeData();
instance.queryData();
instance.jointQueryData();
}
}

JE常见异常

  • DatabaseNotFoundException 当没有找到指定的数据库的时候会返回这个异常
  • DeadlockException 线程间死锁异常
  • RunRecoveryException 回收异常,当发生此异常的时候,你必须得重新打开环境变量。

关于日志文件

  • JE的日志文件只能APPEND,第一个日志文件名是 00000000.jdb,当他增长到一定大小的时候(默认是10M),开始写第二个日志文件00000001.jdb,已此类推。
  • 跟C版本有所不同,JE的数据日志和事务日志是放在一起的,而不是分开放的。
  • JE cleaner负责清扫没用到的磁盘空间,删除后,或者更新后新的记录会追加进来,而原有的记录空间就不在使用了,cleaner负责清理不用的空间。
  • 清理并不是立即进行的,当你关闭你的数据库环境后,通过调用一个cleaner方法来清理。
  • 清理也不是只动执行的,需要你自己手动调用cleaner 方法来定时清理的。
  • 日志文件的删除仅发生在检查点之后。cleaner准备出哪些log文件需要被删除,当检查点过后,删掉一些不在被使用的文件。默认情况下,每写20M的日志文件就执行一次检查点。

另,使用BDB时需要注意的一些问题和技巧,参见http://rockybalboa.blog.51cto.com/1010693/477387