Mapreduce katsetused Azure platvormil
Allikas: Lambda
Sisukord
MapReduce Azure platvormil
Microsofti "Getting started" artikkel
Katsetustes kasutame shelli, Pythonit ja stream-I/O-d, mis ei eelda mingite pakettide ja arenduskeskkondade installimist.
Microsofti Pythoni tutorial
Katsetus: word count
mapper.py
#!/usr/bin/env python import sys # 'file' in this case is STDIN def read_input(file): # Split each line into words for line in file: yield line.split() def main(separator='\t'): data = read_input(sys.stdin) # Process each words returned from read_input for words in data: # Process each word for word in words: # Write to STDOUT print('%s%s%d' % (word, separator, 1)) if __name__ == "__main__": main()
reducer.py
#!/usr/bin/env python from itertools import groupby from operator import itemgetter import sys # 'file' in this case is STDIN def read_mapper_output(file, separator='\t'): # Go through each line for line in file: # Strip out the separator character yield line.rstrip().split(separator, 1) def main(separator='\t'): data = read_mapper_output(sys.stdin, separator=separator) # Group words and counts into 'group' # Since MapReduce is a distributed process, each word # may have multiple counts. 'group' will have all counts # which can be retrieved using the word as the key. for current_word, group in groupby(data, itemgetter(0)): try: # For each word, pull the count(s) for the word # from 'group' and create a total count total_count = sum(int(count) for current_word, count in group) # Write to stdout print("%s%s%d" % (current_word, separator, total_count)) except ValueError: # Count was not a number, so do nothing pass if __name__ == "__main__": main()
Mingid sisendfailid on juba olemas:
hdfs dfs -ls /example/data/
Tõmbame nende pealt käima, /example
on kirjutatav storage
yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar -files mapper.py,reducer.py -mapper mapper.py -reducer reducer.py -input wasbs:///example/data/gutenberg/davinci.txt -output wasbs:///example/wordcountout
Väljund
hdfs dfs -text /example/wordcountout/part-00000 | head -50
Katsetus: sort
mapper.py
#!/usr/bin/env python import sys # 'file' in this case is STDIN def read_input(file): for line in file: yield (x.strip() for x in line.split(",")) def main(separator='\t'): data = read_input(sys.stdin) # For movielens ratings.csv for userid, movieid, rating, ts in data: hhash = "%s|%s"%(rating, movieid) rest = "%s|%s"%(userid, ts) print('%s%s%s' % (hhash, separator, rest)) if __name__ == "__main__": main()
reducer.py
#!/usr/bin/env python #from itertools import groupby #from operator import itemgetter import sys # 'file' in this case is STDIN def read_mapper_output(file, separator='\t'): for line in file: yield line.rstrip().split(separator, 1) def main(separator='\t'): data = read_mapper_output(sys.stdin, separator=separator) # just copy output (assuming it might be fed to other # reducers) for hhash, rest in data: print("%s%s%s" % (hhash, separator, rest)) if __name__ == "__main__": main()
Testandmed: http://files.grouplens.org/datasets/movielens/ml-20m.zip
Kopeeri hdfs-i:
hdfs dfs -put ml-20m/ratings.csv /example/data/
Katsetus: ühised huvid
Sotsiaalmeedia andmete pealt: kes jälgib keda twitteris. Leiame kahe suvalise kasutaja ühised huvid.
Testandmed: https://snap.stanford.edu/data/higgs-social_network.edgelist.gz
Esimene samm: koonda jälgijad kokku
mapper.py
#!/usr/bin/env python import sys # 'file' in this case is STDIN def read_input(file): for line in file: yield (x.strip() for x in line.split(" ", 1)) # stage 1: map 'followed' to key def main(separator='\t'): data = read_input(sys.stdin) for follower, followed in data: print('%s%s%s' % (followed, separator, follower)) if __name__ == "__main__": main()
reducer.py
#!/usr/bin/env python from itertools import groupby from operator import itemgetter import sys # 'file' in this case is STDIN def read_mapper_output(file, separator='\t'): for line in file: yield line.rstrip().split(separator, 1) # stage 1: group all followers def main(separator='\t'): data = read_mapper_output(sys.stdin, separator=separator) for followed, group in groupby(data, itemgetter(0)): try: # remove dupes (set will do that automagically # and efficiently) follower_set = set( follower for dummy, follower in group) all_followers = ",".join(list(follower_set)) # Write to stdout print("%s%s%s" % (followed, separator, all_followers)) except ValueError: pass if __name__ == "__main__": main()
Teine samm: leia, kes keda ühiselt jälgivad
mapper.py
#!/usr/bin/env python import sys # 'file' in this case is STDIN def read_input(file): for line in file: followed, all_fw = line.rstrip().split("\t", 1) yield (followed, all_fw.split(",")) # stage 2: all pairs of followers def main(separator='\t'): data = read_input(sys.stdin) # For movielens ratings.csv for followed, all_fw in data: fwc = len(all_fw) for i in range(fwc - 1): for j in range(i + 1, fwc): # emit pair if all_fw[i] < all_fw[j]: pair = "%s|%s"%(all_fw[i], all_fw[j]) else: pair = "%s|%s"%(all_fw[j], all_fw[i]) print('%s%s%s' % (pair, separator, followed)) if __name__ == "__main__": main()
reducer.py
#!/usr/bin/env python from itertools import groupby from operator import itemgetter import sys # 'file' in this case is STDIN def read_mapper_output(file, separator='\t'): for line in file: yield line.rstrip().split(separator, 1) # stage 2: group all followed ids per pair def main(separator='\t'): data = read_mapper_output(sys.stdin, separator=separator) for pair, group in groupby(data, itemgetter(0)): try: all_followed = ",".join( followed for dummy, followed in group) # Write to stdout print("%s%s%s" % (pair, separator, all_followed)) except ValueError: pass if __name__ == "__main__": main()