Mapreduce katsetused Azure platvormil

Allikas: Lambda

MapReduce Azure platvormil

Microsofti "Getting started" artikkel

Katsetustes kasutame shelli, Pythonit ja stream-I/O-d, mis ei eelda mingite pakettide ja arenduskeskkondade installimist.

Microsofti Pythoni tutorial

hdfs käsud

Katsetus: word count

mapper.py

#!/usr/bin/env python

import sys

# 'file' in this case is STDIN
def read_input(file):
    # Split each line into words
    for line in file:
        yield line.split()

def main(separator='\t'):
    data = read_input(sys.stdin)
    # Process each words returned from read_input
    for words in data:
        # Process each word
        for word in words:
            # Write to STDOUT
            print('%s%s%d' % (word, separator, 1))

if __name__ == "__main__":
    main()

reducer.py

#!/usr/bin/env python

from itertools import groupby
from operator import itemgetter
import sys

# 'file' in this case is STDIN
def read_mapper_output(file, separator='\t'):
    # Go through each line
    for line in file:
        # Strip out the separator character
        yield line.rstrip().split(separator, 1)

def main(separator='\t'):
    data = read_mapper_output(sys.stdin, separator=separator)
    # Group words and counts into 'group'
    #   Since MapReduce is a distributed process, each word
    #   may have multiple counts. 'group' will have all counts
    #   which can be retrieved using the word as the key.
    for current_word, group in groupby(data, itemgetter(0)):
        try:
            # For each word, pull the count(s) for the word
            #   from 'group' and create a total count
            total_count = sum(int(count) for current_word, count in group)
            # Write to stdout
            print("%s%s%d" % (current_word, separator, total_count))
        except ValueError:
            # Count was not a number, so do nothing
            pass

if __name__ == "__main__":
    main()


Mingid sisendfailid on juba olemas:

 hdfs dfs -ls /example/data/

Tõmbame nende pealt käima, /example on kirjutatav storage

 yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar -files mapper.py,reducer.py -mapper mapper.py -reducer reducer.py -input wasbs:///example/data/gutenberg/davinci.txt -output wasbs:///example/wordcountout

Väljund

 hdfs dfs -text /example/wordcountout/part-00000 | head -50

Katsetus: sort

mapper.py

#!/usr/bin/env python

import sys

# 'file' in this case is STDIN
def read_input(file):
    for line in file:
        yield (x.strip() for x in line.split(","))

def main(separator='\t'):
    data = read_input(sys.stdin)
    # For movielens ratings.csv
    for userid, movieid, rating, ts in data:
        hhash = "%s|%s"%(rating, movieid)
        rest = "%s|%s"%(userid, ts)
        print('%s%s%s' % (hhash, separator, rest))

if __name__ == "__main__":
    main()

reducer.py

#!/usr/bin/env python

#from itertools import groupby
#from operator import itemgetter
import sys

# 'file' in this case is STDIN
def read_mapper_output(file, separator='\t'):
    for line in file:
        yield line.rstrip().split(separator, 1)

def main(separator='\t'):
    data = read_mapper_output(sys.stdin, separator=separator)
    # just copy output (assuming it might be fed to other
    # reducers)
    for hhash, rest in data:
        print("%s%s%s" % (hhash, separator, rest))

if __name__ == "__main__":
    main()

Testandmed: http://files.grouplens.org/datasets/movielens/ml-20m.zip

Kopeeri hdfs-i:

 hdfs dfs -put ml-20m/ratings.csv /example/data/

Katsetus: ühised huvid

Sotsiaalmeedia andmete pealt: kes jälgib keda twitteris. Leiame kahe suvalise kasutaja ühised huvid.

Testandmed: https://snap.stanford.edu/data/higgs-social_network.edgelist.gz

Esimene samm: koonda jälgijad kokku

mapper.py

#!/usr/bin/env python

import sys

# 'file' in this case is STDIN
def read_input(file):
    for line in file:
        yield (x.strip() for x in line.split(" ", 1))

# stage 1: map 'followed' to key
def main(separator='\t'):
    data = read_input(sys.stdin)
    for follower, followed in data:
        print('%s%s%s' % (followed, separator, follower))

if __name__ == "__main__":
    main()

reducer.py

#!/usr/bin/env python

from itertools import groupby
from operator import itemgetter
import sys

# 'file' in this case is STDIN
def read_mapper_output(file, separator='\t'):
    for line in file:
        yield line.rstrip().split(separator, 1)

# stage 1: group all followers
def main(separator='\t'):
    data = read_mapper_output(sys.stdin, separator=separator)
    for followed, group in groupby(data, itemgetter(0)):
        try:
            # remove dupes (set will do that automagically
            # and efficiently)
            follower_set = set(
                follower for dummy, follower in group)
            all_followers = ",".join(list(follower_set))
            # Write to stdout
            print("%s%s%s" % (followed, separator, all_followers))
        except ValueError:
            pass

if __name__ == "__main__":
    main()

Teine samm: leia, kes keda ühiselt jälgivad

mapper.py

#!/usr/bin/env python

import sys

# 'file' in this case is STDIN
def read_input(file):
    for line in file:
        followed, all_fw = line.rstrip().split("\t", 1)
        yield (followed, all_fw.split(","))

# stage 2: all pairs of followers
def main(separator='\t'):
    data = read_input(sys.stdin)
    # For movielens ratings.csv
    for followed, all_fw in data:
        fwc = len(all_fw)
        for i in range(fwc - 1):
            for j in range(i + 1, fwc):
                # emit pair
                if all_fw[i] < all_fw[j]:
                    pair = "%s|%s"%(all_fw[i], all_fw[j])
                else:
                    pair = "%s|%s"%(all_fw[j], all_fw[i])
                print('%s%s%s' % (pair, separator, followed))

if __name__ == "__main__":
    main()

reducer.py

#!/usr/bin/env python

from itertools import groupby
from operator import itemgetter
import sys

# 'file' in this case is STDIN
def read_mapper_output(file, separator='\t'):
    for line in file:
        yield line.rstrip().split(separator, 1)

# stage 2: group all followed ids per pair
def main(separator='\t'):
    data = read_mapper_output(sys.stdin, separator=separator)
    for pair, group in groupby(data, itemgetter(0)):
        try:
            all_followed = ",".join(
                followed for dummy, followed in group)
            # Write to stdout
            print("%s%s%s" % (pair, separator, all_followed))
        except ValueError:
            pass

if __name__ == "__main__":
    main()