Mapreduce katsetused Azure platvormil
Allikas: Lambda
Sisukord
MapReduce Azure platvormil
Microsofti "Getting started" artikkel
Katsetustes kasutame shelli, Pythonit ja stream-I/O-d, mis ei eelda mingite pakettide ja arenduskeskkondade installimist.
Microsofti Pythoni tutorial
Katsetus: word count
mapper.py
#!/usr/bin/env python
import sys
# 'file' in this case is STDIN
def read_input(file):
# Split each line into words
for line in file:
yield line.split()
def main(separator='\t'):
data = read_input(sys.stdin)
# Process each words returned from read_input
for words in data:
# Process each word
for word in words:
# Write to STDOUT
print('%s%s%d' % (word, separator, 1))
if __name__ == "__main__":
main()
reducer.py
#!/usr/bin/env python
from itertools import groupby
from operator import itemgetter
import sys
# 'file' in this case is STDIN
def read_mapper_output(file, separator='\t'):
# Go through each line
for line in file:
# Strip out the separator character
yield line.rstrip().split(separator, 1)
def main(separator='\t'):
data = read_mapper_output(sys.stdin, separator=separator)
# Group words and counts into 'group'
# Since MapReduce is a distributed process, each word
# may have multiple counts. 'group' will have all counts
# which can be retrieved using the word as the key.
for current_word, group in groupby(data, itemgetter(0)):
try:
# For each word, pull the count(s) for the word
# from 'group' and create a total count
total_count = sum(int(count) for current_word, count in group)
# Write to stdout
print("%s%s%d" % (current_word, separator, total_count))
except ValueError:
# Count was not a number, so do nothing
pass
if __name__ == "__main__":
main()
Mingid sisendfailid on juba olemas:
hdfs dfs -ls /example/data/
Tõmbame nende pealt käima, /example on kirjutatav storage
yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar -files mapper.py,reducer.py -mapper mapper.py -reducer reducer.py -input wasbs:///example/data/gutenberg/davinci.txt -output wasbs:///example/wordcountout
Väljund
hdfs dfs -text /example/wordcountout/part-00000 | head -50
Katsetus: sort
mapper.py
#!/usr/bin/env python
import sys
# 'file' in this case is STDIN
def read_input(file):
for line in file:
yield (x.strip() for x in line.split(","))
def main(separator='\t'):
data = read_input(sys.stdin)
# For movielens ratings.csv
for userid, movieid, rating, ts in data:
hhash = "%s|%s"%(rating, movieid)
rest = "%s|%s"%(userid, ts)
print('%s%s%s' % (hhash, separator, rest))
if __name__ == "__main__":
main()
reducer.py
#!/usr/bin/env python
#from itertools import groupby
#from operator import itemgetter
import sys
# 'file' in this case is STDIN
def read_mapper_output(file, separator='\t'):
for line in file:
yield line.rstrip().split(separator, 1)
def main(separator='\t'):
data = read_mapper_output(sys.stdin, separator=separator)
# just copy output (assuming it might be fed to other
# reducers)
for hhash, rest in data:
print("%s%s%s" % (hhash, separator, rest))
if __name__ == "__main__":
main()
Testandmed: http://files.grouplens.org/datasets/movielens/ml-20m.zip
Kopeeri hdfs-i:
hdfs dfs -put ml-20m/ratings.csv /example/data/
Katsetus: ühised huvid
Sotsiaalmeedia andmete pealt: kes jälgib keda twitteris. Leiame kahe suvalise kasutaja ühised huvid.
Testandmed: https://snap.stanford.edu/data/higgs-social_network.edgelist.gz
Esimene samm: koonda jälgijad kokku
mapper.py
#!/usr/bin/env python
import sys
# 'file' in this case is STDIN
def read_input(file):
for line in file:
yield (x.strip() for x in line.split(" ", 1))
# stage 1: map 'followed' to key
def main(separator='\t'):
data = read_input(sys.stdin)
for follower, followed in data:
print('%s%s%s' % (followed, separator, follower))
if __name__ == "__main__":
main()
reducer.py
#!/usr/bin/env python
from itertools import groupby
from operator import itemgetter
import sys
# 'file' in this case is STDIN
def read_mapper_output(file, separator='\t'):
for line in file:
yield line.rstrip().split(separator, 1)
# stage 1: group all followers
def main(separator='\t'):
data = read_mapper_output(sys.stdin, separator=separator)
for followed, group in groupby(data, itemgetter(0)):
try:
# remove dupes (set will do that automagically
# and efficiently)
follower_set = set(
follower for dummy, follower in group)
all_followers = ",".join(list(follower_set))
# Write to stdout
print("%s%s%s" % (followed, separator, all_followers))
except ValueError:
pass
if __name__ == "__main__":
main()
Teine samm: leia, kes keda ühiselt jälgivad
mapper.py
#!/usr/bin/env python
import sys
# 'file' in this case is STDIN
def read_input(file):
for line in file:
followed, all_fw = line.rstrip().split("\t", 1)
yield (followed, all_fw.split(","))
# stage 2: all pairs of followers
def main(separator='\t'):
data = read_input(sys.stdin)
# For movielens ratings.csv
for followed, all_fw in data:
fwc = len(all_fw)
for i in range(fwc - 1):
for j in range(i + 1, fwc):
# emit pair
if all_fw[i] < all_fw[j]:
pair = "%s|%s"%(all_fw[i], all_fw[j])
else:
pair = "%s|%s"%(all_fw[j], all_fw[i])
print('%s%s%s' % (pair, separator, followed))
if __name__ == "__main__":
main()
reducer.py
#!/usr/bin/env python
from itertools import groupby
from operator import itemgetter
import sys
# 'file' in this case is STDIN
def read_mapper_output(file, separator='\t'):
for line in file:
yield line.rstrip().split(separator, 1)
# stage 2: group all followed ids per pair
def main(separator='\t'):
data = read_mapper_output(sys.stdin, separator=separator)
for pair, group in groupby(data, itemgetter(0)):
try:
all_followed = ",".join(
followed for dummy, followed in group)
# Write to stdout
print("%s%s%s" % (pair, separator, all_followed))
except ValueError:
pass
if __name__ == "__main__":
main()