1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
public
class
SequenceFileWriteDemo {
private
static
final
String[] DATA = {
"One, two, buckle my shoe"
,
"Three, four, shut the door"
,
"Five, six, pick up sticks"
,
"Seven, eight, lay them straight"
,
"Nine, ten, a big fat hen"
};
public
static
void
main(String[] args)
throws
IOException {
String uri = args[
0
];
Configuration conf =
new
Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path path =
new
Path(uri);
IntWritable key =
new
IntWritable();
Text value =
new
Text();
SequenceFile.Writer writer =
null
;
try
{
writer = SequenceFile.createWriter(fs, conf, path,
//
key.getClass(), value.getClass());
for
(
int
i =
0
; i <
100
; i++) {
key.set(
100
- i);
value.set(DATA[i % DATA.length]);
System.out.printf(
"[%s]\t%s\t%s\n"
, writer.getLength(), key, value);//getLength獲取的是當前文件的位置
writer.append(key, value);
}
}
finally
{
IOUtils.closeStream(writer);
}
}
}
|
% [128] 100 One, two, buckle my shoe [173] 99 Three, four, shut the door [220] 98 Five, six, pick up sticks [264] 97 Seven, eight, lay them straight [314] 96 Nine, ten, a big fat hen [359] 95 One, two, buckle my shoe [404] 94 Three, four, shut the door [451] 93 Five, six, pick up sticks [495] 92 Seven, eight, lay them straight [545] 91 Nine, ten, a big fat hen ... [1976] 60 One, two, buckle my shoe [2021] 59 Three, four, shut the door [2088] 58 Five, six, pick up sticks [2132] 57 Seven, eight, lay them straight [2182] 56 Nine, ten, a big fat hen ... [4557] 5 One, two, buckle my shoe [4602] 4 Three, four, shut the door [4649] 3 Five, six, pick up sticks [4693] 2 Seven, eight, lay them straight [4743] 1 Nine, ten, a big fat henhadoop SequenceFileWriteDemo numbers.seq
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
public
class
SequenceFileReadDemo {
public
static
void
main(String[] args)
throws
IOException {
String uri = args[
0
];
Configuration conf =
new
Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path path =
new
Path(uri);
SequenceFile.Reader reader =
null
;
try
{
reader =
new
SequenceFile.Reader(fs, path, conf);
Writable key = (Writable)
ReflectionUtils.newInstance(reader.getKeyClass(), conf);//獲取key的數據類型 是從reader中獲取的
Writable value = (Writable)
ReflectionUtils.newInstance(reader.getValueClass(), conf);
long
position = reader.getPosition();
while
(reader.next(key, value)) {
String syncSeen = reader.syncSeen() ?
"*"
:
""
;//同步點,那就*標記
System.out.printf(
"[%s%s]\t%s\t%s\n"
, position, syncSeen, key, value);
position = reader.getPosition();
// beginning of next record
}
}
finally
{
IOUtils.closeStream(reader);
}
}
}
|
Writable key = (Writable)
ReflectionUtils.newInstance(reader.getKeyClass(), conf);//獲取key的數據類型 是從reader中獲取的
Writable value = (Writable)
ReflectionUtils.newInstance(reader.getValueClass(), conf);
% [128] 100 One, two, buckle my shoe [173] 99 Three, four, shut the door [220] 98 Five, six, pick up sticks [264] 97 Seven, eight, lay them straight [314] 96 Nine, ten, a big fat hen [359] 95 One, two, buckle my shoe [404] 94 Three, four, shut the door [451] 93 Five, six, pick up sticks [495] 92 Seven, eight, lay them straight [545] 91 Nine, ten, a big fat hen [590] 90 One, two, buckle my shoe ... [1976] 60 One, two, buckle my shoe [2021*] 59 Three, four, shut the door [2088] 58 Five, six, pick up sticks [2132] 57 Seven, eight, lay them straight [2182] 56 Nine, ten, a big fat hen ... [4557] 5 One, two, buckle my shoe [4602] 4 Three, four, shut the door [4649] 3 Five, six, pick up sticks [4693] 2 Seven, eight, lay them straight [4743] 1 Nine, ten, a big fat henhadoop SequenceFileReadDemo numbers.seq
reader.seek(359);assertThat(reader.next(key,value),is(true));assertThat(((IntWritable)key).get(),is(95));
reader.seek(360);reader.next(key,value);// fails with IOException
reader.sync(360);assertThat(reader.getPosition(),is(2021L));assertThat(reader.next(key,value),is(true));assertThat(((IntWritable)key).get(),is(59));
% 100 One, two, buckle my shoe 99 Three, four, shut the door 98 Five, six, pick up sticks 97 Seven, eight, lay them straight 96 Nine, ten, a big fat hen 95 One, two, buckle my shoe 94 Three, four, shut the door 93 Five, six, pick up sticks 92 Seven, eight, lay them straight 91 Nine, ten, a big fat henhadoop fs -text numbers.seq | head
% % 1 Nine, ten, a big fat hen 2 Seven, eight, lay them straight 3 Five, six, pick up sticks 4 Three, four, shut the door 5 One, two, buckle my shoe 6 Nine, ten, a big fat hen 7 Seven, eight, lay them straight 8 Five, six, pick up sticks 9 Three, four, shut the door 10 One, two, buckle my shoehadoop jar \$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar \sort -r 1 \-inFormat org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat \-outFormat org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat \-outKey org.apache.hadoop.io.IntWritable \-outValue org.apache.hadoop.io.Text \numbers.seq sortedhadoop fs -text sorted/part-r-00000 | head
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
public
class
MapFileWriteDemo {
private
static
final
String[] DATA = {
"One, two, buckle my shoe"
,
"Three, four, shut the door"
,
"Five, six, pick up sticks"
,
"Seven, eight, lay them straight"
,
"Nine, ten, a big fat hen"
};
public
static
void
main(String[] args)
throws
IOException {
String uri = args[
0
];
Configuration conf =
new
Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
IntWritable key =
new
IntWritable();
//MapFile的key 是WritableComparable類型的,而value是Writable類型的;用來實現key的索引就須要比較值
Text value =
new
Text();
MapFile.Writer writer =
null
;
try
{
writer =
new
MapFile.Writer(conf, fs, uri,
key.getClass(), value.getClass());
for
(
int
i =
0
; i <
1024
; i++) {
key.set(i +
1
);
value.set(DATA[i % DATA.length]);
writer.append(key, value);
}
}
finally
{
IOUtils.closeStream(writer);
}
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
|
public
static
void
main(String[] args)
throws
IOException {
Configuration conf =
new
Configuration();
FileSystem fs = FileSystem.get(URI.create(uri),conf);
Path path =
new
Path(
"/numbers.map"
);
MapFile.Reader reader =
new
MapFile.Reader(fs, path.toString(), conf);
WritableComparable key = (WritableComparable)ReflectionUtils.newInstance(reader.getKeyClass(), conf);
Writable value = (Writable)ReflectionUtils.newInstance(reader.getValueClass(), conf);
while
(reader.next(key, value)){
System.out.println(
"key = "
+key);
System.out.println(
"value = "
+value);
}
IOUtils.closeStream(reader);
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
|
/** Return the value for the named key, or null if none exists. */
public
synchronized
Writable get(WritableComparable key, Writable val)
throws
IOException {
if
(seek(key)) {//找key 二叉搜索
data.getCurrentValue(val);//找值 遍歷
return
val;
}
else
return
null
;
}
/** Positions the reader at the named key, or if none such exists, at the
* first entry after the named key. Returns true iff the named key exists
* in this map.
*/
public
synchronized
boolean
seek(WritableComparable key)
throws
IOException {
return
seekInternal(key) ==
0
;
}
private
synchronized
int
seekInternal(WritableComparable key,
final
boolean
before)
throws
IOException {
readIndex();
// make sure index is read讀取索引文件
if
(seekIndex != -
1
// seeked before 查找以前的預處理
&& seekIndex+
1
< count
&& comparator.compare(key, keys[seekIndex+
1
])<
0
// before next indexed
&& comparator.compare(key, nextKey)
>=
0
) {
// but after last seeked
// do nothing
}
else
{
seekIndex = binarySearch(key);
if
(seekIndex <
0
)
// decode insertion point解碼插入點
seekIndex = -seekIndex-
2
;
if
(seekIndex == -
1
)
// belongs before first entry
seekPosition = firstPosition;
// use beginning of file從文件開頭開始
else
seekPosition = positions[seekIndex];
// else use index不然使用當前索引值
}
data.seek(seekPosition);//文件指針指向開始的位置接下來想向後面查找
if
(nextKey ==
null
)
nextKey = comparator.newKey();
// If we're looking for the key before, we need to keep track
// of the position we got the current key as well as the position
// of the key before it.
long
prevPosition = -
1
;
long
curPosition = seekPosition;
while
(data.next(nextKey)) {//讀取下一個key到nextkey中去
int
c = comparator.compare(key, nextKey);//比較以前的key和下一個key
if
(c <=
0
) {
// at or beyond desired 表示nextkey比key大因此位置就已經超過了咱們要找的那個key要從頭再找
if
(before && c !=
0
) {//若是沒有到key
if
(prevPosition == -
1
) {
// We're on the first record of this index block
// and we've already passed the search key. Therefore
// we must be at the beginning of the file, so seek
// to the beginning of this block and return c
data.seek(curPosition);//從新找
}
else
{
// We have a previous record to back up to
data.seek(prevPosition);//返回以前的記錄點
data.next(nextKey);
// now that we've rewound, the search key must be greater than this key
return
1
;
}
}
return
c;
}
if
(before) {//找到,獲取當前的位置
prevPosition = curPosition;
curPosition = data.getPosition();
}
}
return
1
;
}
/**
* Get the 'value' corresponding to the last read 'key'.
* @param val : The 'value' to be read.
* @throws IOException
*/
public synchronized void getCurrentValue(Writable val)
throws IOException {
if (val instanceof Configurable) {
((Configurable) val).setConf(this.conf);
}
// Position stream to 'current' value
seekToCurrentValue();
if (!blockCompressed) {
val.readFields(valIn);
if (valIn.read() > 0) {
LOG.info("available bytes: " + valIn.available());
throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
+ " bytes, should read " +
(valBuffer.getLength()-keyLength));
}
} else {
// Get the value
int valLength = WritableUtils.readVInt(valLenIn);
val.readFields(valIn);
// Read another compressed 'value'
--noBufferedValues;
// Sanity check
if ((valLength < 0) && LOG.isDebugEnabled()) {
LOG.debug(val + " is a zero-length value");
}
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
public
class
MapFileFixer {
@SuppressWarnings
(
"unchecked"
)
public
static
void
main(String[] args)
throws
Exception {
String mapUri = args[
0
];
Configuration conf =
new
Configuration();
FileSystem fs = FileSystem.get(URI.create(mapUri), conf);
Path map =
new
Path(mapUri);
Path mapData =
new
Path(map, MapFile.DATA_FILE_NAME);
// Get key and value types from data sequence file
SequenceFile.Reader reader =
new
SequenceFile.Reader(fs, mapData, conf);//讀取文件
Class keyClass = reader.getKeyClass();
Class valueClass = reader.getValueClass();
reader.close();
// Create the map file index file
long
entries = MapFile.fix(fs, map, keyClass, valueClass,
false
, conf);
System.out.printf(
"Created MapFile %s with %d entries\n"
, map, entries);
}
}
|