介绍
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J

问题 » 现在有一组日志数据,A-O用户,分别有各自关注的用户,需要知道两两共同好用是那些用户

下面使用MapReduce的方式来解决

先来用图片的方式看看买个阶段处理的结果过程和介绍

第三列下面还有很多,省略显示了

很明显两步计算得到了结果

先梳理下解决思路

  1. 找到用户被哪些用户关注了

    例如: A 被I,K,C,B,G,F,H,O,D 关注了

  2. 两两关注的人就有共同的用户

    例如: I和K的共同好友就是A, C和I的共同好友就是A

代码环境配置
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
<build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.8.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.2</version>
        </dependency>

    </dependencies>

用MapReduce解决此问题,需要也是需要两个步骤

第一步

需要从0环节中处理成1环节的结果

第一次Map
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.tangf.friend;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FriendMapper1 extends Mapper<LongWritable, Text, Text, Text> {

    private Text k = new Text();
    private Text v = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        final String[] split = value.toString().split(":");
        v.set(split[0]);  
        final String[] men = split[1].split(",");
        for (String man : men) {
            k.set(man);
            context.write(k, v);
        }
    }
}

DEBUG

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
A:B,C,D,F,E,O

行处理 
1.用:分割 成两个值
2.第一个值就是关注的人
3.第二个值使用,分割的被关注的人
4.输出结果
B A
C A
D A
F A
E A
O A

第一个Map的输出结果是:
B	A
C	A
D	A
F	A
E	A
O	A
A	B
C	B
E	B
K	B
F	C
A	C
D	C
I	C
A	D
E	D
F	D
L	D
B	E
C	E
D	E
M	E
L	E
A	F
B	F
C	F
D	F
E	F
O	F
M	F
A	G
C	G
D	G
E	G
F	G
A	H
C	H
D	H
E	H
O	H
A	I
O	I
B	J
O	J
A	K
C	K
D	K
D	L
E	L
F	L
E	M
F	M
G	M
A	O
H	O
I	O
J	O

第一次Reduce
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.tangf.friend;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FriendReducer extends Reducer<Text, Text, Text, Text> {

    private Text v = new Text();
    private StringBuffer sb = new StringBuffer();

    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

        sb.delete(0, sb.length());
        for (Text value : values) {
            sb.append(value).append(",");
        }
        v.set(sb.toString());
        context.write(key, v);

    }
}

DEBUG

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
A	I,K,C,B,G,F,H,O,D,
B	A,F,J,E,
C	A,E,B,H,F,G,K,
D	G,C,K,A,L,F,E,H,
E	G,M,L,H,A,F,B,D,
F	L,M,D,C,G,A,
G	M,
H	O,
I	O,C,
J	O,
K	B,
L	D,E,
M	E,F,
O	A,H,I,J,F,
第二步
第二次Map
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.tangf.friend;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FriendMapper2 extends Mapper<LongWritable, Text, Text, Text> {

    private Text k = new Text();
    private Text v = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        final String[] split = value.toString().split("\t");
        v.set(split[0]);

        final String[] ss = split[1].split(",");
        for (int i = 0; i < ss.length; i++) {
            for (int j = i + 1; j < ss.length; j++) {
                if (ss[i].compareTo(ss[j]) > 0) {
                    k.set(ss[j] + "-" + ss[i]);
                } else {
                    k.set(ss[i] + "-" + ss[j]);
                }
                context.write(k, v);
            }
        }

    }
}

DEBUG

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
I-K	A
C-I	A
B-I	A
G-I	A
F-I	A
H-I	A
I-O	A
D-I	A
C-K	A
B-K	A
G-K	A
F-K	A
H-K	A
K-O	A
D-K	A
B-C	A
C-G	A
C-F	A
C-H	A
C-O	A
C-D	A
B-G	A
B-F	A
B-H	A
B-O	A
B-D	A
F-G	A
G-H	A
G-O	A
D-G	A
F-H	A
F-O	A
D-F	A
H-O	A
D-H	A
D-O	A
A-F	B
A-J	B
A-E	B
F-J	B
E-F	B
E-J	B
A-E	C
A-B	C
A-H	C
A-F	C
A-G	C
A-K	C
B-E	C
E-H	C
E-F	C
E-G	C
E-K	C
B-H	C
B-F	C
B-G	C
B-K	C
F-H	C
G-H	C
H-K	C
F-G	C
F-K	C
G-K	C
C-G	D
G-K	D
A-G	D
G-L	D
F-G	D
E-G	D
G-H	D
C-K	D
A-C	D
C-L	D
C-F	D
C-E	D
C-H	D
A-K	D
K-L	D
F-K	D
E-K	D
H-K	D
A-L	D
A-F	D
A-E	D
A-H	D
F-L	D
E-L	D
H-L	D
E-F	D
F-H	D
E-H	D
G-M	E
G-L	E
G-H	E
A-G	E
F-G	E
B-G	E
D-G	E
L-M	E
H-M	E
A-M	E
F-M	E
B-M	E
D-M	E
H-L	E
A-L	E
F-L	E
B-L	E
D-L	E
A-H	E
F-H	E
B-H	E
D-H	E
A-F	E
A-B	E
A-D	E
B-F	E
D-F	E
B-D	E
L-M	F
D-L	F
C-L	F
G-L	F
A-L	F
D-M	F
C-M	F
G-M	F
A-M	F
C-D	F
D-G	F
A-D	F
C-G	F
A-C	F
A-G	F
C-O	I
D-E	L
E-F	M
A-H	O
A-I	O
A-J	O
A-F	O
H-I	O
H-J	O
F-H	O
I-J	O
F-I	O
F-J	O

第二次Reduce

和第一次Reduce代码一致

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.tangf.friend;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FriendReducer extends Reducer<Text, Text, Text, Text> {

    private Text v = new Text();
    private StringBuffer sb = new StringBuffer();

    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

        sb.delete(0, sb.length());
        for (Text value : values) {
            sb.append(value).append(",");
        }
        v.set(sb.toString());
        context.write(key, v);

    }
}

DEBUG

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
A-B	C,E,
A-C	D,F,
A-D	F,E,
A-E	B,D,C,
A-F	C,B,O,D,E,
A-G	F,D,E,C,
A-H	D,O,E,C,
A-I	O,
A-J	O,B,
A-K	D,C,
A-L	E,D,F,
A-M	F,E,
B-C	A,
B-D	E,A,
B-E	C,
B-F	E,A,C,
B-G	C,A,E,
B-H	E,C,A,
B-I	A,
B-K	C,A,
B-L	E,
B-M	E,
B-O	A,
C-D	F,A,
C-E	D,
C-F	D,A,
C-G	D,F,A,
C-H	A,D,
C-I	A,
C-K	D,A,
C-L	D,F,
C-M	F,
C-O	I,A,
D-E	L,
D-F	A,E,
D-G	E,A,F,
D-H	E,A,
D-I	A,
D-K	A,
D-L	E,F,
D-M	E,F,
D-O	A,
E-F	B,M,D,C,
E-G	D,C,
E-H	D,C,
E-J	B,
E-K	D,C,
E-L	D,
F-G	C,D,A,E,
F-H	A,E,O,C,D,
F-I	O,A,
F-J	O,B,
F-K	A,C,D,
F-L	E,D,
F-M	E,
F-O	A,
G-H	A,D,E,C,
G-I	A,
G-K	C,A,D,
G-L	F,D,E,
G-M	F,E,
G-O	A,
H-I	A,O,
H-J	O,
H-K	A,D,C,
H-L	D,E,
H-M	E,
H-O	A,
I-J	O,
I-K	A,
I-O	A,
K-L	D,
K-O	A,
L-M	F,E,

总结

Map用来映射新的结构,Reduce用来汇总结果