准备数据
自己准备一段500词左右的英文文本。
MapReduce程序设计
编写单词共现程序,窗口大小为5
WholeFileInputFormat
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class WholeFileInputFormat extends FileInputFormat<Text, Text> {
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}
@Override
public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
return new WholeFileInputRecord((FileSplit) split, context.getConfiguration());
}
}
WholeFileInputRecord
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class WholeFileInputRecord extends RecordReader<Text, Text> {
private boolean progress = false;
private Text key = new Text();
private Text value = new Text();
private Configuration conf;
private FileSplit fileSplit;
private FSDataInputStream fis;
public WholeFileInputRecord(FileSplit fileSplit, Configuration conf) {
this.fileSplit = fileSplit;
this.conf = conf;
}
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
fis = fs.open(file);
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (!progress) {
byte[] content = new byte[(int) fileSplit.getLength()];
Path file = fileSplit.getPath();
key.set(file.getName());
try {
IOUtils.readFully(fis, content, 0, content.length);
value.set(content);
} catch (Exception e) {
e.printStackTrace();
} finally {
IOUtils.closeStream(fis);
}
progress = true;
return true;
}
return false;
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return key;
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return progress ? 1.0f : 0.0f;
}
@Override
public void close() throws IOException {
}
}
WordConcurrnce
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class WordConcurrnce {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
if (args == null || args.length != 3) {
throw new RuntimeException("请输入输入路径、输出路径和窗口大小");
}
Configuration conf = new Configuration();
conf.setInt("windowSize", Integer.parseInt(args[2]));
Job job = Job.getInstance(conf);
job.setJobName("WordConcurrnce");
job.setJarByClass(WordConcurrnce.class);
job.setMapperClass(WordConcurrnceMap.class);
job.setMapOutputKeyClass(WordPair.class);
job.setMapOutputValueClass(IntWritable.class);
job.setReducerClass(WordConcurrnceReduce.class);
job.setOutputKeyClass(WordPair.class);
job.setOutputValueClass(IntWritable.class);
job.setInputFormatClass(WholeFileInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPaths(job, args[0]);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
WordConcurrnceMap
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WordConcurrnceMap extends Mapper<Text, Text, WordPair, IntWritable> {
private int windowSize = 0;
private static final String WORD_REGEX = "([a-zA-Z]{1,})";// 仅仅匹配由字母组成的简单英文单词
private static final Pattern WORD_PATTERN = Pattern.compile(WORD_REGEX);// 用于识别英语单词(带连字符-)
private Queue<String> queue = new LinkedList<String>();
private static final IntWritable ONE = new IntWritable(1);
protected void setup(Context context) throws java.io.IOException, InterruptedException {
Configuration conf = context.getConfiguration();
windowSize = conf.getInt("windowSize", 20);
};
protected void map(Text key, Text value, Context context) throws java.io.IOException, InterruptedException {
System.out.println("value:" + value);
System.out.println("windowSize:" + windowSize);
Matcher matcher = WORD_PATTERN.matcher(value.toString());
while (matcher.find()) {
String word = matcher.group();
queue.add(word);
if (queue.size() >= windowSize) {
wordPair(context);
}
}
while (!(queue.size() <= 1)) {
wordPair(context);
}
};
private void wordPair(Context context) throws IOException, InterruptedException {
Iterator<String> it = queue.iterator();
String wordA = it.next();
while (it.hasNext()) {
String wordB = it.next();
context.write(new WordPair(wordA, wordB), ONE);
}
queue.remove();
}
}
WordConcurrnceReduce
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class WordConcurrnceReduce extends Reducer<WordPair, IntWritable, WordPair, IntWritable> {
private IntWritable wordSum = new IntWritable();
protected void reduce(WordPair key, Iterable<IntWritable> values, Context context) throws java.io.IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
wordSum.set(sum);
context.write(key, wordSum);
};
}
WordPair
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class WordPair implements WritableComparable<WordPair> {
private String wordA;
private String wordB;
public WordPair() {
}
public WordPair(String wordA, String wordB) {
this.wordA = wordA;
this.wordB = wordB;
}
public void write(DataOutput out) throws IOException {
out.writeUTF(wordA);
out.writeUTF(wordB);
}
public void readFields(DataInput in) throws IOException {
wordA = in.readUTF();
wordB = in.readUTF();
}
@Override
public String toString() {
return wordA + "," + wordB;
}
@Override
public int hashCode() {
return (wordA.hashCode() + wordB.hashCode()) * 9;
}
public int compareTo(WordPair o) {
if (equals(o)) {
return 0;
}
return (wordA + wordB).compareTo(o.wordA + o.wordB);
}
@Override
public boolean equals(Object obj) {
if (obj instanceof WordPair) {
return false;
}
WordPair w = (WordPair) obj;
if (wordA.equals(w.wordA) && wordB.equals(w.wordB)) {
return true;
}
if (wordA.equals(w.wordB) && wordB.equals(w.wordA)) {
return true;
}
return false;
}
}
## 运行
hadoop jar WordOccurence.jar /input /output
欢迎查看我的CSDN博客:Welcome To Ryan’s Home