Misplaced Pages

MapReduce: Difference between revisions

Article snapshot taken from Wikipedia with creative commons attribution-sharealike license. Give it a read and then ask your questions in the chat. We can research this topic together.
Browse history interactively← Previous editContent deleted Content addedVisualWikitext
Revision as of 18:06, 19 December 2005 editMarudubshinki (talk | contribs)49,641 edits xpand← Previous edit Latest revision as of 18:47, 12 December 2024 edit undoAadirulez8 (talk | contribs)Extended confirmed users44,246 editsm v2.05 - Autofix / Fix errors for CW project (Link equal to linktext)Tag: WPCleaner 
Line 1: Line 1:
{{Short description|Parallel programming model}}
'''MapReduce''' is a programming tool developed by ] in ], in which parallel computations over large (> 1 ]) data sets are performed. The terminology of "Map" and "Reduce", and their general idea, is borrowed from ]s use of the constructs ] and ] in ]. {{ref|map}}
'''MapReduce''' is a ] and an associated implementation for processing and generating ] sets with a ] and ] algorithm on a ].<ref>{{cite web|url=https://hadoop.apache.org/docs/r1.2.1/mapred_tutorial.html|title=MapReduce Tutorial|access-date=3 July 2019|website=Apache Hadoop}}</ref><ref>{{cite web|url=http://news.cnet.com/8301-10784_3-9955184-7.html|title=Google spotlights data center inner workings|date=30 May 2008|website=cnet.com|access-date=31 May 2008|archive-date=19 October 2013|archive-url=https://web.archive.org/web/20131019063218/http://news.cnet.com/8301-10784_3-9955184-7.html|url-status=dead}}</ref><ref name="GoogleMapReduce">{{cite web|url=http://static.googleusercontent.com/media/research.google.com/es/us/archive/mapreduce-osdi04.pdf|title=MapReduce: Simplified Data Processing on Large Clusters|website=googleusercontent.com}}</ref>


A MapReduce program is composed of a ] ], which performs filtering and sorting (such as sorting students by first name into queues, one queue for each name), and a '']'' method, which performs a summary operation (such as counting the number of students in each queue, yielding name frequencies). The "MapReduce System" (also called "infrastructure" or "framework") orchestrates the processing by ] the distributed servers, running the various tasks in parallel, managing all communications and data transfers between the various parts of the system, and providing for ] and ].
The actual software is implemented by specifying a ''Map'' function that maps key-value pairs to new key-value pairs and a subsequent ''Reduce'' function that consolidates all mapped key-value pairs sharing the same keys to single key-value pairs.


The model is a specialization of the ''split-apply-combine'' strategy for data analysis.<ref>{{Cite journal | doi = 10.18637/jss.v040.i01| title = The split-apply-combine strategy for data analysis| journal = Journal of Statistical Software| volume = 40| pages = 1–29| year = 2011| last1 = Wickham| first1 = Hadley | doi-access = free}}</ref>
==map and reduce==
It is inspired by the ] and ] functions commonly used in ],<ref name="map">"Our abstraction is inspired by the map and reduce primitives present in Lisp and many other functional languages." -, by Jeffrey Dean and Sanjay Ghemawat; from Google Research</ref> although their purpose in the MapReduce framework is not the same as in their original forms.<ref>{{Cite journal | doi = 10.1016/j.scico.2007.07.001| title = Google's Map ''Reduce'' programming model — Revisited| journal = Science of Computer Programming| volume = 70| pages = 1–30| year = 2008| last1 = Lämmel | first1 = R. | doi-access = }}</ref> The key contributions of the MapReduce framework are not the actual map and reduce functions (which, for example, resemble the 1995 ] standard's<ref>http://www.mcs.anl.gov/research/projects/mpi/mpi-standard/mpi-report-2.0/mpi2-report.htm MPI 2 standard</ref> ''reduce''<ref>{{cite web|url=http://mpitutorial.com/tutorials/mpi-reduce-and-allreduce/|title=MPI Reduce and Allreduce · MPI Tutorial|website=mpitutorial.com}}</ref> and ''scatter''<ref>{{cite web|url=http://mpitutorial.com/tutorials/performing-parallel-rank-with-mpi/|title=Performing Parallel Rank with MPI · MPI Tutorial|website=mpitutorial.com}}</ref> operations), but the scalability and fault-tolerance achieved for a variety of applications due to parallelization. As such, a ] implementation of MapReduce is usually not faster than a traditional (non-MapReduce) implementation; any gains are usually only seen with ] implementations on multi-processor hardware.<ref name=stackoverflow>{{cite web
In simpler terms, what a map function does is go over a conceptual list of independent elements (for example, a list of test scores) and performs a specified operation on each element (with the previous example, one might have discovered a flaw in the test that gave each student a score too high by one; one could then define a map function of "minus 1"- it would subtract one from each score, correcting them.); the fact that each element is operated on independently, and that the original list is not being modified because a new list is created to hold the answers means that it is very easy to make a map operation highly parallel, and thus useful in high-performance applications and domains like ].
| url = https://stackoverflow.com/questions/3947889/mongodb-terrible-mapreduce-performance
| title = MongoDB: Terrible MapReduce Performance
| publisher = Stack Overflow
| date = October 16, 2010
| quote = The MapReduce implementation in MongoDB has little to do with map reduce apparently. Because for all I read, it is single-threaded, while map-reduce is meant to be used highly parallel on a cluster. ... MongoDB MapReduce is single threaded on a single server...
}}</ref> The use of this model is beneficial only when the optimized distributed shuffle operation (which reduces network communication cost) and fault tolerance features of the MapReduce framework come into play. Optimizing the communication cost is essential to a good MapReduce algorithm.<ref name="ullman" />


MapReduce ] have been written in many programming languages, with different levels of optimization. A popular ] implementation that has support for distributed shuffles is part of ]. The name MapReduce originally referred to the proprietary ] technology, but has since become a ]. By 2014, Google was no longer using MapReduce as its primary '']'' processing model,<ref>{{cite web|url=http://www.datacenterknowledge.com/archives/2014/06/25/google-dumps-mapreduce-favor-new-hyper-scale-analytics-system/ | title=Google Dumps MapReduce in Favor of New Hyper-Scale Analytics System |last1=Sverdlik |first1=Yevgeniy |date=2014-06-25 |website=Data Center Knowledge |access-date=2015-10-25 |quote="We don't really use MapReduce anymore" }}</ref> and development on ] had moved on to more capable and less disk-oriented mechanisms that incorporated full map and reduce capabilities.<ref>{{cite news |url=https://analyticsindiamag.com/ai-origins-evolution/why-mapreduce-is-still-a-dominant-approach-for-large-scale-machine-learning/ | title=Why MapReduce Is Still A Dominant Approach For Large-Scale Machine Learning | work=Analytics India | date=April 5, 2019}}</ref>
A reduce operation on the other hand, usually takes a list and combines elements appropriately (Continuing the preceding example, what if one wanted to know the class average? One could define a reduce function which halved the size of the list by adding an entry in the list to its neighbor, recursively continuing until there is only one (large) entry, and dividing the total sum by the original entry of elements to get the average); while since a reduce always ends up with a single answer, it is not as parallelizable as a map function, the large number of fairly independent calculations means that reduce functions are still useful in highly parallel environments.

==Overview==
MapReduce is a framework for processing ] problems across large datasets using a large number of computers (nodes), collectively referred to as a ] (if all nodes are on the same local network and use similar hardware) or a ] (if the nodes are shared across geographically and administratively distributed systems, and use more heterogeneous hardware). Processing can occur on data stored either in a ] (unstructured) or in a ] (structured). MapReduce can take advantage of the locality of data, processing it near the place it is stored in order to minimize communication overhead.

A MapReduce framework (or system) is usually composed of three operations (or steps):

# '''Map:''' each worker node applies the <code>map</code> function to the local data, and writes the output to a temporary storage. A master node ensures that only one copy of the redundant input data is processed.
# '''Shuffle:''' worker nodes redistribute data based on the output keys (produced by the <code>map</code> function), such that all data belonging to one key is located on the same worker node.
# '''Reduce:''' worker nodes now process each group of output data, per key, in parallel.

MapReduce allows for the distributed processing of the map and reduction operations. Maps can be performed in parallel, provided that each mapping operation is independent of the others; in practice, this is limited by the number of independent data sources and/or the number of CPUs near each source. Similarly, a set of 'reducers' can perform the reduction phase, provided that all outputs of the map operation that share the same key are presented to the same reducer at the same time, or that the reduction function is ]. While this process often appears inefficient compared to algorithms that are more sequential (because multiple instances of the reduction process must be run), MapReduce can be applied to significantly larger datasets than a single ] can handle&nbsp;&ndash; a large ] can use MapReduce to sort a ] of data in only a few hours.<ref>{{cite web|last=Czajkowski|first=Grzegorz|title=Sorting Petabytes with MapReduce – The Next Episode|url=http://googleresearch.blogspot.com/2011/09/sorting-petabytes-with-mapreduce-next.html|access-date=7 April 2014|author2=Marián Dvorský |author3=Jerry Zhao |author4=Michael Conley |date=7 September 2011 }}</ref> The parallelism also offers some possibility of recovering from partial failure of servers or storage during the operation: if one mapper or reducer fails, the work can be rescheduled&nbsp;&ndash; assuming the input data are still available.

Another way to look at MapReduce is as a 5-step parallel and distributed computation:

# '''Prepare the Map() input''' – the "MapReduce system" designates Map processors, assigns the input key ''K1'' that each processor would work on, and provides that processor with all the input data associated with that key.
# '''Run the user-provided Map() code''' – Map() is run exactly once for each ''K1'' key, generating output organized by key ''K2''.
# '''"Shuffle" the Map output to the Reduce processors''' – the MapReduce system designates Reduce processors, assigns the ''K2'' key each processor should work on, and provides that processor with all the Map-generated data associated with that key.
# '''Run the user-provided Reduce() code''' – Reduce() is run exactly once for each ''K2'' key produced by the Map step.
# '''Produce the final output''' – the MapReduce system collects all the Reduce output, and sorts it by ''K2'' to produce the final outcome.

These five steps can be logically thought of as running in sequence – each step starts only after the previous step is completed – although in practice they can be interleaved as long as the final result is not affected.

In many situations, the input data might have already been distributed (]) among many different servers, in which case step 1 could sometimes be greatly simplified by assigning Map servers that would process the locally present input data. Similarly, step 3 could sometimes be sped up by assigning Reduce processors that are as close as possible to the Map-generated data they need to process.

==Logical view==
The ''Map'' and ''Reduce'' functions of ''MapReduce'' are both defined with respect to data structured in (key, value) pairs. ''Map'' takes one pair of data with a type in one ], and returns a list of pairs in a different domain:

<code>Map(k1,v1)</code> → <code>list(k2,v2)</code>

The ''Map'' function is applied in parallel to every pair (keyed by <code>k1</code>) in the input dataset. This produces a list of pairs (keyed by <code>k2</code>) for each call.
After that, the MapReduce framework collects all pairs with the same key (<code>k2</code>) from all lists and groups them together, creating one group for each key.

The ''Reduce'' function is then applied in parallel to each group, which in turn produces a collection of values in the same domain:

<code>Reduce(k2, list (v2))</code> → <code>list((k3, v3))</code><ref>{{Cite web|url=https://hadoop.apache.org/docs/r1.2.1/mapred_tutorial.html#Inputs+and+Outputs|title = MapReduce Tutorial}}</ref>

Each ''Reduce'' call typically produces either one key value pair or an empty return, though one call is allowed to return more than one key value pair. The returns of all calls are collected as the desired result list.

Thus the MapReduce framework transforms a list of (key, value) pairs into another list of (key, value) pairs.<ref>{{Cite web|url=https://github.com/apache/hadoop-mapreduce/blob/307cb5b316e10defdbbc228d8cdcdb627191ea15/src/java/org/apache/hadoop/mapreduce/Reducer.java#L148|title = Apache/Hadoop-mapreduce|website = ]|date = 31 August 2021}}</ref> This behavior is different from the typical functional programming map and reduce combination, which accepts a list of arbitrary values and returns one single value that combines ''all'' the values returned by map.

It is ] to have implementations of the map and reduce abstractions in order to implement MapReduce. Distributed implementations of MapReduce require a means of connecting the processes performing the Map and Reduce phases. This may be a ]. Other options are possible, such as direct streaming from mappers to reducers, or for the mapping processors to serve up their results to reducers that query them.

===Examples===
The canonical MapReduce example counts the appearance of each word in a set of documents:<ref>{{cite web|url=http://research.google.com/archive/mapreduce-osdi04-slides/index-auto-0004.html|title=Example: Count word occurrences|publisher=Google Research|access-date=September 18, 2013}}</ref>

'''function''' <u>map</u>(String name, String document):
''// name: document name''
''// document: document contents''
'''for each''' word w '''in''' document:
emit (w, 1)
'''function''' <u>reduce</u>(String word, Iterator partialCounts):
''// word: a word''
''// partialCounts: a list of aggregated partial counts''
sum = 0
'''for each''' pc '''in''' partialCounts:
sum += pc
emit (word, sum)

Here, each document is split into words, and each word is counted by the ''map'' function, using the word as the result key. The framework puts together all the pairs with the same key and feeds them to the same call to ''reduce''. Thus, this function just needs to sum all of its input values to find the total appearances of that word.

As another example, imagine that for a database of 1.1 billion people, one would like to compute the average number of social contacts a person has according to age. In ], such a query could be expressed as:

<syntaxhighlight lang="sql">
SELECT age, AVG(contacts)
FROM social.person
GROUP BY age
ORDER BY age
</syntaxhighlight>

Using MapReduce, the {{mono|K1}} key values could be the integers 1 through 1100, each representing a batch of 1 million records, the {{mono|K2}} key value could be a person's age in years, and this computation could be achieved using the following functions:

'''function''' Map '''is'''
'''input:''' '''integer''' K1 between 1 and 1100, representing a batch of 1 million social.person records
'''for each''' social.person record in the K1 batch '''do'''
'''let''' Y be the person's age
'''let''' N be the number of contacts the person has
'''produce one output record''' (Y,(N,1))
'''repeat'''
'''end function'''
'''function''' Reduce '''is'''
'''input:''' age (in years) Y
'''for each''' input record (Y,(N,C)) '''do'''
Accumulate in S the sum of N*C
Accumulate in C<sub>new</sub> the sum of C
'''repeat'''
'''let''' A be S/C<sub>new</sub>
'''produce one output record''' (Y,(A,C<sub>new</sub>))
'''end function'''

Note that in the {{mono|Reduce}} function, {{mono|C}} is the count of people having in total N contacts, so in the {{mono|Map}} function it is natural to write {{mono|1=C=1}}, since every output pair is referring to the contacts of one single person.

The MapReduce system would line up the 1100 Map processors, and would provide each with its corresponding 1 million input records. The Map step would produce 1.1 billion {{mono|(Y,(N,1))}} records, with {{mono|Y}} values ranging between, say, 8 and 103. The MapReduce System would then line up the 96 Reduce processors by performing shuffling operation of the key/value pairs due to the fact that we need average per age, and provide each with its millions of corresponding input records. The Reduce step would result in the much reduced set of only 96 output records {{mono|(Y,A)}}, which would be put in the final result file, sorted by {{mono|Y}}.

The count info in the record is important if the processing is reduced more than one time. If we did not add the count of the records, the computed average would be wrong, for example:

''-- map output #1: age, quantity of contacts''
10, 9
10, 9
10, 9

''-- map output #2: age, quantity of contacts''
10, 9
10, 9

''-- map output #3: age, quantity of contacts''
10, 10

If we reduce files {{mono|#1}} and {{mono|#2}}, we will have a new file with an average of 9 contacts for a 10-year-old person ((9+9+9+9+9)/5):

''-- reduce step #1: age, average of contacts''
10, 9

If we reduce it with file {{mono|#3}}, we lose the count of how many records we've already seen, so we end up with an average of 9.5 contacts for a 10-year-old person ((9+10)/2), which is wrong. The correct answer is 9.1<span style="text-decoration: overline;">66</span> = 55 / 6 = (9×3+9×2+10×1)/(3+2+1).

==Dataflow==
] adheres to ] where code is effectively divided into unmodifiable ''frozen spots'' and ] ''hot spots''. The frozen spot of the MapReduce framework is a large distributed sort. The hot spots, which the application defines, are:
* an ''input reader''
* a ''Map'' function
* a ''partition'' function
* a ''compare'' function
* a ''Reduce'' function
* an ''output writer''

===Input reader===
The ''input reader'' divides the input into appropriate size 'splits' (in practice, typically, 64&nbsp;MB to 128&nbsp;MB) and the framework assigns one split to each ''Map'' function. The ''input reader'' reads data from stable storage (typically, a ]) and generates key/value pairs.

A common example will read a directory full of text files and return each line as a record.

===Map function===
The ''Map'' function takes a series of key/value pairs, processes each, and generates zero or more output key/value pairs. The input and output types of the map can be (and often are) different from each other.

If the application is doing a word count, the map function would break the line into words and output a key/value pair for each word. Each output pair would contain the word as the key and the number of instances of that word in the line as the value.

===Partition function===
Each ''Map'' function output is allocated to a particular ''reducer'' by the application's ''partition'' function for ] purposes. The ''partition'' function is given the key and the number of reducers and returns the index of the desired ''reducer''.

A typical default is to ] the key and use the hash value ] the number of ''reducers''. It is important to pick a partition function that gives an approximately uniform distribution of data per shard for ] purposes, otherwise the MapReduce operation can be held up waiting for slow reducers to finish
(i.e. the reducers assigned the larger shares of the non-uniformly partitioned data).

Between the map and reduce stages, the data are ''shuffled'' (parallel-sorted / exchanged between nodes) in order to move the data from the map node that produced them to the shard in which they will be reduced. The shuffle can sometimes take longer than the computation time depending on network bandwidth, CPU speeds, data produced and time taken by map and reduce computations.

===Comparison function===
The input for each ''Reduce'' is pulled from the machine where the ''Map'' ran and sorted using the application's ''comparison'' function.

===Reduce function===
The framework calls the application's ''Reduce'' function once for each unique key in the sorted order. The ''Reduce'' can iterate through the values that are associated with that key and produce zero or more outputs.

In the word count example, the ''Reduce'' function takes the input values, sums them and generates a single output of the word and the final sum.

===Output writer===
The ''Output Writer'' writes the output of the ''Reduce'' to the stable storage.

==Theoretical background==

Properties of ] are the basis for ensuring the validity of MapReduce operations.<ref>{{Cite journal
| doi = 10.1017/S0956796817000193
| title = An algebra for distributed Big Data analytics
| journal = Journal of Functional Programming
| volume = 28
| year = 2017
| last = Fegaras
| first = Leonidas
| s2cid = 44629767
| doi-access =
}}</ref><ref>{{cite arXiv
|last=Lin
|first=Jimmy
|title=Monoidify! Monoids as a Design Principle for Efficient MapReduce Algorithms
|eprint=1304.7544
|date=29 Apr 2013|class=cs.DC
}}</ref>

In the Algebird package<ref>{{Cite web
|title= Abstract Algebra for Scala
|url=https://twitter.github.io/algebird/}}</ref> a Scala implementation of Map/Reduce explicitly requires a monoid class type
.<ref>{{Cite web
|title= Encoding Map-Reduce As A Monoid With Left Folding
|date= 5 September 2016|url= http://erikerlandson.github.io/blog/2016/09/05/expressing-map-reduce-as-a-left-folding-monoid/}}</ref>

The operations of MapReduce deal with two types: the type ''A'' of input data being mapped, and the type ''B'' of output data being reduced.

The ''Map'' operation takes individual values of type ''A'' and produces, for each ''a:A'' a value ''b:B''; The ''Reduce'' operation requires a binary operation • defined on values of type ''B''; it consists of folding all available ''b:B'' to a single value.

From a basic requirements point of view, any MapReduce operation must involve the ability to arbitrarily regroup data being reduced. Such a requirement amounts to two properties of the operation •:
* associativity: (''x'' • ''y'') • ''z'' = ''x'' • (''y'' • ''z'')
* existence of neutral element ''e'' such that ''e'' • ''x'' = ''x'' • ''e'' = ''x'' for every ''x:B''.

The second property guarantees that, when parallelized over multiple nodes, the nodes that don't have any data to process would have no impact on the result.

These two properties amount to having a ] (''B'', •, ''e'') on values of type ''B'' with operation • and with neutral element ''e''.

There's no requirements on the values of type ''A''; an arbitrary function ''A'' &rarr; ''B'' can be used for the ''Map'' operation. This means that we have a ] ''A*'' &rarr; (''B'', •, ''e''). Here ''A*'' denotes a ], also known as the type of lists over ''A''.

The ''Shuffle'' operation per se is not related to the essence of MapReduce; it's needed to distribute calculations over the cloud.

It follows from the above that not every binary ''Reduce'' operation will work in MapReduce. Here are the counter-examples:

* building a tree from subtrees: this operation is not associative, and the result will depend on grouping;
* direct calculation of averages: ''avg'' is also not associative (and it has no neutral element); to calculate an average, one needs to calculate ].

==Performance considerations==
MapReduce programs are not guaranteed to be fast. The main benefit of this programming model is to exploit the optimized shuffle operation of the platform, and only having to write the ''Map'' and ''Reduce'' parts of the program.
In practice, the author of a MapReduce program however has to take the shuffle step into consideration; in particular the partition function and the amount of data written by the ''Map'' function can have a large impact on the performance and scalability. Additional modules such as the ''Combiner'' function can help to reduce the amount of data written to disk, and transmitted over the network. MapReduce applications can achieve sub-linear speedups under specific circumstances.<ref name=":0">{{Cite journal|title = BSP cost and scalability analysis for MapReduce operations|journal = Concurrency and Computation: Practice and Experience|date = 2015-01-01|issn = 1532-0634|pages = 2503–2527|doi = 10.1002/cpe.3628|first1 = Hermes|last1 = Senger|first2 = Veronica|last2 = Gil-Costa|first3 = Luciana|last3 = Arantes|first4 = Cesar A. C.|last4 = Marcondes|first5 = Mauricio|last5 = Marín|first6 = Liria M.|last6 = Sato|first7 = Fabrício A.B.|last7 = da Silva|volume=28|issue = 8|hdl = 10533/147670|s2cid = 33645927|hdl-access = free}}</ref>

When designing a MapReduce algorithm, the author needs to choose a good tradeoff<ref name="ullman">{{Cite journal | doi = 10.1145/2331042.2331053 | title = Designing good MapReduce algorithms| journal = XRDS: Crossroads, the ACM Magazine for Students| volume = 19| pages = 30–34| year = 2012| last1 = Ullman | first1 = J. D. | s2cid = 26498063| author-link1 = Jeffrey Ullman| url = http://xrds.acm.org/article.cfm?aid=2331053 |url-access=subscription}}</ref> between the computation and the communication costs. Communication cost often dominates the computation cost,<ref name="ullman"/><ref name=":0"/> and many MapReduce implementations are designed to write all communication to distributed storage for crash recovery.

In tuning performance of MapReduce, the complexity of mapping, shuffle, sorting (grouping by the key), and reducing has to be taken into account. The amount of data produced by the mappers is a key parameter that shifts the bulk of the computation cost between mapping and reducing. Reducing includes sorting (grouping of the keys) which has nonlinear complexity. Hence, small partition sizes reduce sorting time, but there is a trade-off because having a large number of reducers may be impractical. The influence of split unit size is marginal (unless chosen particularly badly, say <1MB). The gains from some mappers reading load from local disks, on average, is minor.<ref>{{Cite journal|title = Scheduling divisible MapReduce computations|last1 = Berlińska|first1 = Joanna|date = 2010-12-01|journal = Journal of Parallel and Distributed Computing|doi = 10.1016/j.jpdc.2010.12.004|last2 = Drozdowski|first2 = Maciej|volume=71|issue = 3|pages=450–459}}</ref>

For processes that complete quickly, and where the data fits into main memory of a single machine or a small cluster, using a MapReduce framework usually is not effective. Since these frameworks are designed to recover from the loss of whole nodes during the computation, they write interim results to distributed storage. This crash recovery is expensive, and only pays off when the computation involves many computers and a long runtime of the computation. A task that completes in seconds can just be restarted in the case of an error, and the likelihood of at least one machine failing grows quickly with the cluster size. On such problems, implementations keeping all data in memory and simply restarting a computation on node failures or —when the data is small enough— non-distributed solutions will often be faster than a MapReduce system.

==Distribution and reliability==
MapReduce achieves reliability by parceling out a number of operations on the set of data to each node in the network. Each node is expected to report back periodically with completed work and status updates. If a node falls silent for longer than that interval, the master node (similar to the master server in the ]) records the node as dead and sends out the node's assigned work to other nodes. Individual operations use ] operations for naming file outputs as a check to ensure that there are not parallel conflicting threads running. When files are renamed, it is possible to also copy them to another name in addition to the name of the task (allowing for ]).

The reduce operations operate much the same way. Because of their inferior properties with regard to parallel operations, the master node attempts to schedule reduce operations on the same node, or in the same rack as the node holding the data being operated on. This property is desirable as it conserves bandwidth across the backbone network of the datacenter.

Implementations are not necessarily highly reliable. For example, in older versions of ] the ''NameNode'' was a ] for the distributed filesystem. Later versions of Hadoop have high availability with an active/passive failover for the "NameNode."


==Uses== ==Uses==
MapReduce is useful in a wide range of applications, including distributed pattern-based searching, distributed sorting, web link-graph reversal, Singular Value Decomposition,<ref>{{cite web |last1=Bosagh Zadeh|first1=Reza|last2=Carlsson|first2=Gunnar|title=Dimension Independent Matrix Square Using MapReduce |website=Stanford University |url=https://stanford.edu/~rezab/papers/dimsum.pdf|access-date=12 July 2014|bibcode=2013arXiv1304.1467B|year=2013|arxiv=1304.1467}}</ref> web access log stats, ] construction, ], ],<ref name="mrml">{{cite web| url=http://www.willowgarage.com/map-reduce-machine-learning-multicore| title=Map-Reduce for Machine Learning on Multicore|first1=Andrew Y. |last1=Ng |first2=Gary |last2=Bradski |first3=Cheng-Tao |last3=Chu |first4=Kunle |last4=Olukotun |first5=Sang Kyun |last5=Kim |first6=Yi-An |last6=Lin |first7=YuanYuan |last7=Yu| publisher=NIPS 2006| year=2006}}</ref> and ]. Moreover, the MapReduce model has been adapted to several computing environments like multi-core and many-core systems,<ref name="evalMR">{{Cite book | doi = 10.1109/HPCA.2007.346181| chapter = Evaluating MapReduce for Multi-core and Multiprocessor Systems| title = 2007 IEEE 13th International Symposium on High Performance Computer Architecture| pages = 13| year = 2007| last1 = Ranger | first1 = C. | last2 = Raghuraman | first2 = R. | last3 = Penmetsa | first3 = A. | last4 = Bradski | first4 = G. | last5 = Kozyrakis | first5 = C. | isbn = 978-1-4244-0804-7| citeseerx = 10.1.1.220.8210| s2cid = 12563671}}</ref><ref name="graphicsMR">{{Cite book | doi = 10.1145/1454115.1454152| chapter = Mars: a MapReduce framework on graphics processors| title = Proceedings of the 17th international conference on Parallel architectures and compilation techniques – PACT '08| pages = 260| year = 2008| last1 = He | first1 = B. | last2 = Fang | first2 = W. | last3 = Luo | first3 = Q. | last4 = Govindaraju | first4 = N. K. | chapter-url = http://wenbin.org/doc/papers/Wenbin08PACT.pdf| last5 = Wang | first5 = T. | isbn = 9781605582825| s2cid = 207169888}}</ref><ref name="tiledMR">{{Cite book | doi = 10.1145/1854273.1854337| chapter = Tiled-MapReduce: optimizing resource usages of data-parallel applications on multicore with tiling| title = Proceedings of the 19th international conference on Parallel architectures and compilation techniques – PACT '10| pages = 523| year = 2010| last1 = Chen | first1 = R. | last2 = Chen | first2 = H. | last3 = Zang | first3 = B. | isbn = 9781450301787| s2cid = 2082196}}</ref> desktop grids,<ref name="gridMR">{{Cite book | doi = 10.1109/3PGCIC.2010.33| chapter = Towards MapReduce for Desktop Grid Computing| title = 2010 International Conference on P2P, Parallel, Grid, Cloud and Internet Computing| pages = 193| year = 2010| last1 = Tang | first1 = B. | last2 = Moca | first2 = M. | last3 = Chevalier | first3 = S. | last4 = He | first4 = H. | last5 = Fedak | first5 = G. | chapter-url = http://graal.ens-lyon.fr/~gfedak/papers/xtremmapreduce.3pgcic10.pdf| isbn = 978-1-4244-8538-3| citeseerx = 10.1.1.671.2763| s2cid = 15044391}}</ref>
According to Google, they use MapReduce in a wide range of applications, including: "distributed ], distributed sort, web link-graph reversal, term-vector per host, web access log stats inverted index construction, document clustering, ], statistical ]..."
multi-cluster,<ref name="HMR">{{Cite book | doi = 10.1145/1996023.1996026| chapter = A Hierarchical Framework for Cross-Domain MapReduce Execution|chapter-url = http://yuanluo.net/publications/LUO_ECMLS2011.pdf| title = Proceedings of the second international workshop on Emerging computational methods for the life sciences (ECMLS '11)| year = 2011| last1 = Luo | first1 = Y. | last2 = Guo | first2 = Z. | last3 = Sun | first3 = Y.| last4 = Plale | first4 = B. |author4-link=Beth Plale| last5 = Qiu | first5 = J. | last6=Li|
first6=W. |isbn = 978-1-4503-0702-4| citeseerx = 10.1.1.364.9898| s2cid = 15179363}}</ref> volunteer computing environments,<ref name="volunteerMR">{{Cite book | doi = 10.1145/1851476.1851489| chapter = MOON: MapReduce On Opportunistic eNvironments| title = Proceedings of the 19th ACM International Symposium on High Performance Distributed Computing – HPDC '10| pages = 95| year = 2010| last1 = Lin | first1 = H. | last2 = Ma | first2 = X. | last3 = Archuleta | first3 = J. | last4 = Feng | first4 = W. C. | last5 = Gardner | first5 = M. | last6 = Zhang | first6 = Z. | chapter-url = http://eprints.cs.vt.edu/archive/00001089/01/moon.pdf| isbn = 9781605589428| s2cid = 2351790}}</ref> dynamic cloud environments,<ref name="dynCloudMR">{{Cite journal| doi = 10.1016/j.jcss.2011.12.021| title = P2P-MapReduce: Parallel data processing in dynamic Cloud environments| journal = ]| volume = 78| issue = 5| pages = 1382–1402| year = 2012| last1 = Marozzo| first1 = F.| last2 = Talia| first2 = D.| last3 = Trunfio| first3 = P.| doi-access = free}}</ref> mobile environments,<ref name="mobileMR">{{Cite book | doi = 10.1145/1839294.1839332| chapter = Misco: a MapReduce framework for mobile systems| title = Proceedings of the 3rd International Conference on PErvasive Technologies Related to Assistive Environments – PETRA '10| pages = 1| year = 2010| last1 = Dou | first1 = A. | last2 = Kalogeraki | first2 = V. | last3 = Gunopulos | first3 = D. | last4 = Mielikainen | first4 = T. | last5 = Tuulos | first5 = V. H. | isbn = 9781450300711| s2cid = 14517696}}</ref> and high-performance computing environments.<ref>{{cite book|chapter=Characterization and Optimization of Memory-Resident MapReduce on HPC Systems|publisher=IEEE|date=May 2014|doi=10.1109/IPDPS.2014.87|isbn=978-1-4799-3800-1|title=2014 IEEE 28th International Parallel and Distributed Processing Symposium|last1=Wang|first1=Yandong|last2=Goldstone|first2=Robin|last3=Yu|first3=Weikuan|last4=Wang|first4=Teng|pages=799–808|s2cid=11157612}}</ref>


At Google, MapReduce was used to completely regenerate Google's index of the ]. It replaced the old ''ad hoc'' programs that updated the index and ran the various analyses.<ref name="usage">{{cite web| quote=As of October, Google was running about 3,000 computing jobs per day through MapReduce, representing thousands of machine-days, according to a presentation by Dean. Among other things, these batch routines analyze the latest Web pages and update Google's indexes.| url=http://www.baselinemag.com/c/a/Infrastructure/How-Google-Works-1/5| title=How Google Works| date=7 July 2006| publisher=baselinemag.com}}</ref> Development at Google has since moved on to technologies such as Percolator, FlumeJava<ref name="Chambers2010">{{cite book |last1=Chambers |first1=Craig |last2=Raniwala |first2=Ashish |last3=Perry |first3=Frances |last4=Adams |first4=Stephen |last5=Henry |first5=Robert R. |last6=Bradshaw |first6=Robert |last7=Weizenbaum |first7=Nathan |title=Proceedings of the 31st ACM SIGPLAN Conference on Programming Language Design and Implementation |chapter=FlumeJava |date=1 January 2010 |pages=363–375 |doi=10.1145/1806596.1806638 |url=https://static.googleusercontent.com/media/research.google.com/en//pubs/archive/35650.pdf |access-date=4 August 2016 |isbn=9781450300193 |s2cid=14888571 |archive-url=https://web.archive.org/web/20160923141630/https://static.googleusercontent.com/media/research.google.com/en//pubs/archive/35650.pdf |archive-date=23 September 2016 }}</ref> and ] that offer streaming operation and updates instead of batch processing, to allow integrating "live" search results without rebuilding the complete index.<ref>Peng, D., & Dabek, F. (2010, October). Large-scale Incremental Processing Using Distributed Transactions and Notifications. In OSDI (Vol. 10, pp. 1-15).</ref>
MapReduce is used in conjunction with ], for greater parallelization.


MapReduce's stable inputs and outputs are usually stored in a ]. The transient data are usually stored on local disk and fetched remotely by the reducers.
==Other Implementations==
The ] project has developed an experimental implementation of MapReduce.


==References== ==Criticism==
* Dean, Jeffrey & Ghemawat, Sanjay (2004). . Retrieved Apr. 6, 2005.


===Lack of novelty===
{{note|map}} ''"Our abstraction is inspired by the map and reduce primitives present in Lisp and many other functional languages."'' -"MapReduce: Simplified Data Processing on Large Clusters"
] and ], computer scientists specializing in ]s and ]s, have been critical of the breadth of problems that MapReduce can be used for.<ref name="shark">{{cite web| url=http://typicalprogrammer.com/relational-database-experts-jump-the-mapreduce-shark| title=Database Experts Jump the MapReduce Shark}}</ref> They called its interface too low-level and questioned whether it really represents the ] its proponents have claimed it is.<ref name="ddandms1">{{cite web| url=http://craig-henderson.blogspot.com/2009/11/dewitt-and-stonebrakers-mapreduce-major.html| title=MapReduce: A major step backwards| author=David DeWitt|author2=Michael Stonebraker | publisher=craig-henderson.blogspot.com| access-date=2008-08-27| author2-link=Michael Stonebraker| author-link=David DeWitt}}</ref> They challenged the MapReduce proponents' claims of novelty, citing ] as an example of ] that has existed for over two decades. They also compared MapReduce programmers to ] programmers, noting both are "writing in a ] performing low-level record manipulation."<ref name="ddandms1"/> MapReduce's use of input files and lack of ] support prevents the performance improvements enabled by common database system features such as ]s and ], though projects such as ], ], ],<ref name="ApacheHiveWiki">{{cite web| url=https://cwiki.apache.org/confluence/display/Hive/Home| title=Apache Hive – Index of – Apache Software Foundation}}</ref> ]<ref name="HBase">{{cite web| url=http://hbase.apache.org/| title=HBase – HBase Home – Apache Software Foundation}}</ref> and ]<ref name="HBase"/><ref name="BigtablePaper">{{cite web| url=http://research.google.com/archive/bigtable-osdi06.pdf| title=Bigtable: A Distributed Storage System for Structured Data}}</ref> are addressing some of these problems.

Greg Jorgensen wrote an article rejecting these views.<ref name="gj1">{{cite web| url=http://typicalprogrammer.com/relational-database-experts-jump-the-mapreduce-shark| title=Relational Database Experts Jump The MapReduce Shark| author=Greg Jorgensen | publisher=typicalprogrammer.com| access-date=2009-11-11| author-link=Greg Jorgensen}}</ref> Jorgensen asserts that DeWitt and Stonebraker's entire analysis is groundless as MapReduce was never designed nor intended to be used as a database.

DeWitt and Stonebraker have subsequently published a detailed benchmark study in 2009 comparing performance of ] MapReduce and ] approaches on several specific problems.<ref name="sigmod">{{cite web| url=http://database.cs.brown.edu/projects/mapreduce-vs-dbms| title=A Comparison of Approaches to Large-Scale Data Analysis|first1=Andrew |last1=Pavlo |first2=Erik |last2=Paulson |first3=Alexander |last3=Rasin |first4=Daniel J. |last4=Abadi |first5=Deavid J. |last5=DeWitt |first6=Samuel |last6=Madden |first7=Michael |last7=Stonebraker| publisher=Brown University| access-date=2010-01-11}}</ref> They concluded that relational databases offer real advantages for many kinds of data use, especially on complex processing or where the data is used across an enterprise, but that MapReduce may be easier for users to adopt for simple or one-time processing tasks.

The MapReduce programming paradigm was also described in ]'s 1985 thesis <ref name="WDHmit86">{{cite book |author-first=W. Danny |author-last=Hillis |date=1986 |title=The Connection Machine |publisher=] |isbn=0262081571 |url-access=registration |url=https://archive.org/details/connectionmachin00hill }}</ref> intended for use on the ], where it was called "xapping/reduction"<ref>{{cite web |url=http://bitsavers.trailing-edge.com/pdf/thinkingMachines/CM2/HA87-4_Connection_Machine_Model_CM-2_Technical_Summary_Apr1987.pdf |title=Connection Machine Model CM-2 Technical Summary |author=<!--Not stated--> |date=1987-04-01 |publisher=] |access-date=2022-11-21}}</ref> and relied upon that machine's special hardware to accelerate both map and reduce. The dialect ultimately used for the Connection Machine, the 1986 ], had parallel <code>*map</code> and <code>reduce!!</code>,<ref>{{cite web |url=https://www.softwarepreservation.org/projects/LISP/starlisp/supplement-to-the-starlisp-reference-manual-version-5-0.pdf |title=Supplement to the *Lisp Reference Manual |author=<!--Not stated--> |date=1988-09-01 |publisher=] |access-date=2022-11-21}}</ref> which in turn was based on the 1984 ], which had non-parallel <code>map</code> and <code>reduce</code> built in.<ref>{{cite web |url=https://collections.lib.utah.edu/dl_files/20/2e/202ebf04b52d043c78297444bc9bc4fbc17b6b5e.pdf |title=Rediflow Architecture Prospectus |author=<!--Not stated--> |date=1986-04-05 |publisher=] |access-date=2022-11-21}}</ref> The ] approach that the Connection Machine's ] uses to execute <code>reduce</code> in <math>O(\log n)</math> time<ref>{{cite book |url=https://www.cise.ufl.edu/~sahni/papers/imagemono.pdf#page=20 |title=Hypercube Algorithms for Image Processing and Pattern Recognition |last=Ranka |first=Sanjay |date=1989 |access-date=2022-12-08 |section=2.6 Data Sum |publisher=University of Florida}}</ref> is effectively the same as the approach referred to within the Google paper as prior work.{{r|GoogleMapReduce|p=11|q=an associative function can be computed over all prefixes of an N element array in log N time on N processors using parallel prefix computations. MapReduce can be considered a simplification and distillation of some of these models}}

In 2010 Google was granted what is described as a patent on MapReduce. The patent, filed in 2004, may cover use of MapReduce by open source software such as ], ], and others. In '']'', an editor acknowledged Google's role in popularizing the MapReduce concept, but questioned whether the patent was valid or novel.<ref>{{cite news |last1=Paul |first1=Ryan |title=Google's MapReduce patent: what does it mean for Hadoop? |url=https://arstechnica.com/information-technology/2010/01/googles-mapreduce-patent-what-does-it-mean-for-hadoop/ |access-date=21 March 2021 |work=Ars Technica |date=20 January 2010 |language=en-us}}</ref><ref name="patent">{{cite web|url=http://patft.uspto.gov/netacgi/nph-Parser?Sect1=PTO1&Sect2=HITOFF&d=PALL&p=1&u=/netahtml/PTO/srchnum.htm&r=1&f=G&l=50&s1=7,650,331.PN.&OS=PN/7,650,331&RS=PN/7,650,331|title=United States Patent: 7650331 - System and method for efficient large-scale data processing|website=uspto.gov}}</ref> In 2013, as part of its "Open Patent Non-Assertion (OPN) Pledge", Google pledged to only use the patent defensively.<ref>{{cite news |last1=Nazer |first1=Daniel |title=Google Makes Open Patent Non-assertion Pledge and Proposes New Licensing Models |url=https://www.eff.org/deeplinks/2013/03/google-makes-open-patent-non-assertion-pledge |access-date=21 March 2021 |work=Electronic Frontier Foundation |date=28 March 2013 |language=en}}</ref><ref>{{cite news |last1=King |first1=Rachel |title=Google expands open patent pledge to 79 more about data center management |url=https://www.zdnet.com/article/google-expands-open-patent-pledge-to-79-more-about-data-center-management/ |access-date=21 March 2021 |work=ZDNet |date=2013 |language=en}}</ref> The patent is expected to expire on 23 December 2026.<ref>{{cite web |title=System and method for efficient large-scale data processing |url=https://patents.google.com/patent/US7650331B1/en |publisher=Google Patents Search |access-date=21 March 2021 |language=en |date=18 June 2004}}</ref>

===Restricted programming framework===
MapReduce tasks must be written as acyclic dataflow programs, i.e. a stateless mapper followed by a stateless reducer, that are executed by a batch job scheduler. This paradigm makes repeated querying of datasets difficult and imposes limitations that are felt in fields such as ] processing<ref>{{cite conference |url=https://csc.csudh.edu/btang/seminar/papers/BigD399.pdf |title=Map-Based Graph Analysis on MapReduce |last1=Gupta |first1=Upa |last2=Fegaras |first2=Leonidas |date=2013-10-06 |publisher=] |book-title=Proceedings: 2013 IEEE International Conference on Big Data |pages=24–30 |location=] |conference=2013 IEEE International Conference on Big Data}}</ref> where iterative algorithms that revisit a single ] multiple times are the norm, as well as, in the presence of ]-based data with high ], even the field of ] where multiple passes through the data are required even though algorithms can tolerate serial access to the data each pass.<ref>{{cite conference |first1=Matei| last1=Zaharia| first2=Mosharaf |last2=Chowdhury| first3=Michael| last3=Franklin| first4=Scott| last4=Shenker| first5=Ion| last5=Stoica |title=Spark: Cluster Computing with Working Sets |url=https://amplab.cs.berkeley.edu/wp-content/uploads/2011/06/Spark-Cluster-Computing-with-Working-Sets.pdf |conference=HotCloud 2010|date=June 2010}}</ref>

==See also==

* ]
* ]

===Implementations of MapReduce===
* ]
* ]
* ]
* ]

==References==
{{reflist|30em}}


{{Commons category|MapReduce}}
==External link==
*- a paper on an internal tool at Google, Sawzall, which acts as an interface to MapReduce, intended to make MapReduce much easier to use.
* on ''Lambda the Ultimate''.


] {{Google LLC}}
{{Authority control}}
]


{{DEFAULTSORT:Mapreduce}}
]
]
]
]
]

Latest revision as of 18:47, 12 December 2024

Parallel programming model

MapReduce is a programming model and an associated implementation for processing and generating big data sets with a parallel and distributed algorithm on a cluster.

A MapReduce program is composed of a map procedure, which performs filtering and sorting (such as sorting students by first name into queues, one queue for each name), and a reduce method, which performs a summary operation (such as counting the number of students in each queue, yielding name frequencies). The "MapReduce System" (also called "infrastructure" or "framework") orchestrates the processing by marshalling the distributed servers, running the various tasks in parallel, managing all communications and data transfers between the various parts of the system, and providing for redundancy and fault tolerance.

The model is a specialization of the split-apply-combine strategy for data analysis. It is inspired by the map and reduce functions commonly used in functional programming, although their purpose in the MapReduce framework is not the same as in their original forms. The key contributions of the MapReduce framework are not the actual map and reduce functions (which, for example, resemble the 1995 Message Passing Interface standard's reduce and scatter operations), but the scalability and fault-tolerance achieved for a variety of applications due to parallelization. As such, a single-threaded implementation of MapReduce is usually not faster than a traditional (non-MapReduce) implementation; any gains are usually only seen with multi-threaded implementations on multi-processor hardware. The use of this model is beneficial only when the optimized distributed shuffle operation (which reduces network communication cost) and fault tolerance features of the MapReduce framework come into play. Optimizing the communication cost is essential to a good MapReduce algorithm.

MapReduce libraries have been written in many programming languages, with different levels of optimization. A popular open-source implementation that has support for distributed shuffles is part of Apache Hadoop. The name MapReduce originally referred to the proprietary Google technology, but has since become a generic trademark. By 2014, Google was no longer using MapReduce as its primary big data processing model, and development on Apache Mahout had moved on to more capable and less disk-oriented mechanisms that incorporated full map and reduce capabilities.

Overview

MapReduce is a framework for processing parallelizable problems across large datasets using a large number of computers (nodes), collectively referred to as a cluster (if all nodes are on the same local network and use similar hardware) or a grid (if the nodes are shared across geographically and administratively distributed systems, and use more heterogeneous hardware). Processing can occur on data stored either in a filesystem (unstructured) or in a database (structured). MapReduce can take advantage of the locality of data, processing it near the place it is stored in order to minimize communication overhead.

A MapReduce framework (or system) is usually composed of three operations (or steps):

  1. Map: each worker node applies the map function to the local data, and writes the output to a temporary storage. A master node ensures that only one copy of the redundant input data is processed.
  2. Shuffle: worker nodes redistribute data based on the output keys (produced by the map function), such that all data belonging to one key is located on the same worker node.
  3. Reduce: worker nodes now process each group of output data, per key, in parallel.

MapReduce allows for the distributed processing of the map and reduction operations. Maps can be performed in parallel, provided that each mapping operation is independent of the others; in practice, this is limited by the number of independent data sources and/or the number of CPUs near each source. Similarly, a set of 'reducers' can perform the reduction phase, provided that all outputs of the map operation that share the same key are presented to the same reducer at the same time, or that the reduction function is associative. While this process often appears inefficient compared to algorithms that are more sequential (because multiple instances of the reduction process must be run), MapReduce can be applied to significantly larger datasets than a single "commodity" server can handle – a large server farm can use MapReduce to sort a petabyte of data in only a few hours. The parallelism also offers some possibility of recovering from partial failure of servers or storage during the operation: if one mapper or reducer fails, the work can be rescheduled – assuming the input data are still available.

Another way to look at MapReduce is as a 5-step parallel and distributed computation:

  1. Prepare the Map() input – the "MapReduce system" designates Map processors, assigns the input key K1 that each processor would work on, and provides that processor with all the input data associated with that key.
  2. Run the user-provided Map() code – Map() is run exactly once for each K1 key, generating output organized by key K2.
  3. "Shuffle" the Map output to the Reduce processors – the MapReduce system designates Reduce processors, assigns the K2 key each processor should work on, and provides that processor with all the Map-generated data associated with that key.
  4. Run the user-provided Reduce() code – Reduce() is run exactly once for each K2 key produced by the Map step.
  5. Produce the final output – the MapReduce system collects all the Reduce output, and sorts it by K2 to produce the final outcome.

These five steps can be logically thought of as running in sequence – each step starts only after the previous step is completed – although in practice they can be interleaved as long as the final result is not affected.

In many situations, the input data might have already been distributed ("sharded") among many different servers, in which case step 1 could sometimes be greatly simplified by assigning Map servers that would process the locally present input data. Similarly, step 3 could sometimes be sped up by assigning Reduce processors that are as close as possible to the Map-generated data they need to process.

Logical view

The Map and Reduce functions of MapReduce are both defined with respect to data structured in (key, value) pairs. Map takes one pair of data with a type in one data domain, and returns a list of pairs in a different domain:

Map(k1,v1)list(k2,v2)

The Map function is applied in parallel to every pair (keyed by k1) in the input dataset. This produces a list of pairs (keyed by k2) for each call. After that, the MapReduce framework collects all pairs with the same key (k2) from all lists and groups them together, creating one group for each key.

The Reduce function is then applied in parallel to each group, which in turn produces a collection of values in the same domain:

Reduce(k2, list (v2))list((k3, v3))

Each Reduce call typically produces either one key value pair or an empty return, though one call is allowed to return more than one key value pair. The returns of all calls are collected as the desired result list.

Thus the MapReduce framework transforms a list of (key, value) pairs into another list of (key, value) pairs. This behavior is different from the typical functional programming map and reduce combination, which accepts a list of arbitrary values and returns one single value that combines all the values returned by map.

It is necessary but not sufficient to have implementations of the map and reduce abstractions in order to implement MapReduce. Distributed implementations of MapReduce require a means of connecting the processes performing the Map and Reduce phases. This may be a distributed file system. Other options are possible, such as direct streaming from mappers to reducers, or for the mapping processors to serve up their results to reducers that query them.

Examples

The canonical MapReduce example counts the appearance of each word in a set of documents:

function map(String name, String document):
    // name: document name
    // document: document contents
    for each word w in document:
        emit (w, 1)
function reduce(String word, Iterator partialCounts):
    // word: a word
    // partialCounts: a list of aggregated partial counts
    sum = 0
    for each pc in partialCounts:
        sum += pc
    emit (word, sum)

Here, each document is split into words, and each word is counted by the map function, using the word as the result key. The framework puts together all the pairs with the same key and feeds them to the same call to reduce. Thus, this function just needs to sum all of its input values to find the total appearances of that word.

As another example, imagine that for a database of 1.1 billion people, one would like to compute the average number of social contacts a person has according to age. In SQL, such a query could be expressed as:

  SELECT age, AVG(contacts)
    FROM social.person
GROUP BY age
ORDER BY age

Using MapReduce, the K1 key values could be the integers 1 through 1100, each representing a batch of 1 million records, the K2 key value could be a person's age in years, and this computation could be achieved using the following functions:

function Map is
    input: integer K1 between 1 and 1100, representing a batch of 1 million social.person records
    for each social.person record in the K1 batch do
        let Y be the person's age
        let N be the number of contacts the person has
        produce one output record (Y,(N,1))
    repeat
end function
function Reduce is
    input: age (in years) Y
    for each input record (Y,(N,C)) do
        Accumulate in S the sum of N*C
        Accumulate in Cnew the sum of C
    repeat
    let A be S/Cnew
    produce one output record (Y,(A,Cnew))
end function

Note that in the Reduce function, C is the count of people having in total N contacts, so in the Map function it is natural to write C=1, since every output pair is referring to the contacts of one single person.

The MapReduce system would line up the 1100 Map processors, and would provide each with its corresponding 1 million input records. The Map step would produce 1.1 billion (Y,(N,1)) records, with Y values ranging between, say, 8 and 103. The MapReduce System would then line up the 96 Reduce processors by performing shuffling operation of the key/value pairs due to the fact that we need average per age, and provide each with its millions of corresponding input records. The Reduce step would result in the much reduced set of only 96 output records (Y,A), which would be put in the final result file, sorted by Y.

The count info in the record is important if the processing is reduced more than one time. If we did not add the count of the records, the computed average would be wrong, for example:

-- map output #1: age, quantity of contacts
10, 9
10, 9
10, 9
-- map output #2: age, quantity of contacts
10, 9
10, 9
-- map output #3: age, quantity of contacts
10, 10

If we reduce files #1 and #2, we will have a new file with an average of 9 contacts for a 10-year-old person ((9+9+9+9+9)/5):

-- reduce step #1: age, average of contacts
10, 9

If we reduce it with file #3, we lose the count of how many records we've already seen, so we end up with an average of 9.5 contacts for a 10-year-old person ((9+10)/2), which is wrong. The correct answer is 9.166 = 55 / 6 = (9×3+9×2+10×1)/(3+2+1).

Dataflow

Software framework architecture adheres to open-closed principle where code is effectively divided into unmodifiable frozen spots and extensible hot spots. The frozen spot of the MapReduce framework is a large distributed sort. The hot spots, which the application defines, are:

  • an input reader
  • a Map function
  • a partition function
  • a compare function
  • a Reduce function
  • an output writer

Input reader

The input reader divides the input into appropriate size 'splits' (in practice, typically, 64 MB to 128 MB) and the framework assigns one split to each Map function. The input reader reads data from stable storage (typically, a distributed file system) and generates key/value pairs.

A common example will read a directory full of text files and return each line as a record.

Map function

The Map function takes a series of key/value pairs, processes each, and generates zero or more output key/value pairs. The input and output types of the map can be (and often are) different from each other.

If the application is doing a word count, the map function would break the line into words and output a key/value pair for each word. Each output pair would contain the word as the key and the number of instances of that word in the line as the value.

Partition function

Each Map function output is allocated to a particular reducer by the application's partition function for sharding purposes. The partition function is given the key and the number of reducers and returns the index of the desired reducer.

A typical default is to hash the key and use the hash value modulo the number of reducers. It is important to pick a partition function that gives an approximately uniform distribution of data per shard for load-balancing purposes, otherwise the MapReduce operation can be held up waiting for slow reducers to finish (i.e. the reducers assigned the larger shares of the non-uniformly partitioned data).

Between the map and reduce stages, the data are shuffled (parallel-sorted / exchanged between nodes) in order to move the data from the map node that produced them to the shard in which they will be reduced. The shuffle can sometimes take longer than the computation time depending on network bandwidth, CPU speeds, data produced and time taken by map and reduce computations.

Comparison function

The input for each Reduce is pulled from the machine where the Map ran and sorted using the application's comparison function.

Reduce function

The framework calls the application's Reduce function once for each unique key in the sorted order. The Reduce can iterate through the values that are associated with that key and produce zero or more outputs.

In the word count example, the Reduce function takes the input values, sums them and generates a single output of the word and the final sum.

Output writer

The Output Writer writes the output of the Reduce to the stable storage.

Theoretical background

Properties of monoids are the basis for ensuring the validity of MapReduce operations.

In the Algebird package a Scala implementation of Map/Reduce explicitly requires a monoid class type .

The operations of MapReduce deal with two types: the type A of input data being mapped, and the type B of output data being reduced.

The Map operation takes individual values of type A and produces, for each a:A a value b:B; The Reduce operation requires a binary operation • defined on values of type B; it consists of folding all available b:B to a single value.

From a basic requirements point of view, any MapReduce operation must involve the ability to arbitrarily regroup data being reduced. Such a requirement amounts to two properties of the operation •:

  • associativity: (xy) • z = x • (yz)
  • existence of neutral element e such that ex = xe = x for every x:B.

The second property guarantees that, when parallelized over multiple nodes, the nodes that don't have any data to process would have no impact on the result.

These two properties amount to having a monoid (B, •, e) on values of type B with operation • and with neutral element e.

There's no requirements on the values of type A; an arbitrary function AB can be used for the Map operation. This means that we have a catamorphism A* → (B, •, e). Here A* denotes a Kleene star, also known as the type of lists over A.

The Shuffle operation per se is not related to the essence of MapReduce; it's needed to distribute calculations over the cloud.

It follows from the above that not every binary Reduce operation will work in MapReduce. Here are the counter-examples:

  • building a tree from subtrees: this operation is not associative, and the result will depend on grouping;
  • direct calculation of averages: avg is also not associative (and it has no neutral element); to calculate an average, one needs to calculate moments.

Performance considerations

MapReduce programs are not guaranteed to be fast. The main benefit of this programming model is to exploit the optimized shuffle operation of the platform, and only having to write the Map and Reduce parts of the program. In practice, the author of a MapReduce program however has to take the shuffle step into consideration; in particular the partition function and the amount of data written by the Map function can have a large impact on the performance and scalability. Additional modules such as the Combiner function can help to reduce the amount of data written to disk, and transmitted over the network. MapReduce applications can achieve sub-linear speedups under specific circumstances.

When designing a MapReduce algorithm, the author needs to choose a good tradeoff between the computation and the communication costs. Communication cost often dominates the computation cost, and many MapReduce implementations are designed to write all communication to distributed storage for crash recovery.

In tuning performance of MapReduce, the complexity of mapping, shuffle, sorting (grouping by the key), and reducing has to be taken into account. The amount of data produced by the mappers is a key parameter that shifts the bulk of the computation cost between mapping and reducing. Reducing includes sorting (grouping of the keys) which has nonlinear complexity. Hence, small partition sizes reduce sorting time, but there is a trade-off because having a large number of reducers may be impractical. The influence of split unit size is marginal (unless chosen particularly badly, say <1MB). The gains from some mappers reading load from local disks, on average, is minor.

For processes that complete quickly, and where the data fits into main memory of a single machine or a small cluster, using a MapReduce framework usually is not effective. Since these frameworks are designed to recover from the loss of whole nodes during the computation, they write interim results to distributed storage. This crash recovery is expensive, and only pays off when the computation involves many computers and a long runtime of the computation. A task that completes in seconds can just be restarted in the case of an error, and the likelihood of at least one machine failing grows quickly with the cluster size. On such problems, implementations keeping all data in memory and simply restarting a computation on node failures or —when the data is small enough— non-distributed solutions will often be faster than a MapReduce system.

Distribution and reliability

MapReduce achieves reliability by parceling out a number of operations on the set of data to each node in the network. Each node is expected to report back periodically with completed work and status updates. If a node falls silent for longer than that interval, the master node (similar to the master server in the Google File System) records the node as dead and sends out the node's assigned work to other nodes. Individual operations use atomic operations for naming file outputs as a check to ensure that there are not parallel conflicting threads running. When files are renamed, it is possible to also copy them to another name in addition to the name of the task (allowing for side-effects).

The reduce operations operate much the same way. Because of their inferior properties with regard to parallel operations, the master node attempts to schedule reduce operations on the same node, or in the same rack as the node holding the data being operated on. This property is desirable as it conserves bandwidth across the backbone network of the datacenter.

Implementations are not necessarily highly reliable. For example, in older versions of Hadoop the NameNode was a single point of failure for the distributed filesystem. Later versions of Hadoop have high availability with an active/passive failover for the "NameNode."

Uses

MapReduce is useful in a wide range of applications, including distributed pattern-based searching, distributed sorting, web link-graph reversal, Singular Value Decomposition, web access log stats, inverted index construction, document clustering, machine learning, and statistical machine translation. Moreover, the MapReduce model has been adapted to several computing environments like multi-core and many-core systems, desktop grids, multi-cluster, volunteer computing environments, dynamic cloud environments, mobile environments, and high-performance computing environments.

At Google, MapReduce was used to completely regenerate Google's index of the World Wide Web. It replaced the old ad hoc programs that updated the index and ran the various analyses. Development at Google has since moved on to technologies such as Percolator, FlumeJava and MillWheel that offer streaming operation and updates instead of batch processing, to allow integrating "live" search results without rebuilding the complete index.

MapReduce's stable inputs and outputs are usually stored in a distributed file system. The transient data are usually stored on local disk and fetched remotely by the reducers.

Criticism

Lack of novelty

David DeWitt and Michael Stonebraker, computer scientists specializing in parallel databases and shared-nothing architectures, have been critical of the breadth of problems that MapReduce can be used for. They called its interface too low-level and questioned whether it really represents the paradigm shift its proponents have claimed it is. They challenged the MapReduce proponents' claims of novelty, citing Teradata as an example of prior art that has existed for over two decades. They also compared MapReduce programmers to CODASYL programmers, noting both are "writing in a low-level language performing low-level record manipulation." MapReduce's use of input files and lack of schema support prevents the performance improvements enabled by common database system features such as B-trees and hash partitioning, though projects such as Pig (or PigLatin), Sawzall, Apache Hive, HBase and Bigtable are addressing some of these problems.

Greg Jorgensen wrote an article rejecting these views. Jorgensen asserts that DeWitt and Stonebraker's entire analysis is groundless as MapReduce was never designed nor intended to be used as a database.

DeWitt and Stonebraker have subsequently published a detailed benchmark study in 2009 comparing performance of Hadoop's MapReduce and RDBMS approaches on several specific problems. They concluded that relational databases offer real advantages for many kinds of data use, especially on complex processing or where the data is used across an enterprise, but that MapReduce may be easier for users to adopt for simple or one-time processing tasks.

The MapReduce programming paradigm was also described in Danny Hillis's 1985 thesis intended for use on the Connection Machine, where it was called "xapping/reduction" and relied upon that machine's special hardware to accelerate both map and reduce. The dialect ultimately used for the Connection Machine, the 1986 StarLisp, had parallel *map and reduce!!, which in turn was based on the 1984 Common Lisp, which had non-parallel map and reduce built in. The tree-like approach that the Connection Machine's hypercube architecture uses to execute reduce in O ( log n ) {\displaystyle O(\log n)} time is effectively the same as the approach referred to within the Google paper as prior work.

In 2010 Google was granted what is described as a patent on MapReduce. The patent, filed in 2004, may cover use of MapReduce by open source software such as Hadoop, CouchDB, and others. In Ars Technica, an editor acknowledged Google's role in popularizing the MapReduce concept, but questioned whether the patent was valid or novel. In 2013, as part of its "Open Patent Non-Assertion (OPN) Pledge", Google pledged to only use the patent defensively. The patent is expected to expire on 23 December 2026.

Restricted programming framework

MapReduce tasks must be written as acyclic dataflow programs, i.e. a stateless mapper followed by a stateless reducer, that are executed by a batch job scheduler. This paradigm makes repeated querying of datasets difficult and imposes limitations that are felt in fields such as graph processing where iterative algorithms that revisit a single working set multiple times are the norm, as well as, in the presence of disk-based data with high latency, even the field of machine learning where multiple passes through the data are required even though algorithms can tolerate serial access to the data each pass.

See also

Implementations of MapReduce

References

  1. "MapReduce Tutorial". Apache Hadoop. Retrieved 3 July 2019.
  2. "Google spotlights data center inner workings". cnet.com. 30 May 2008. Archived from the original on 19 October 2013. Retrieved 31 May 2008.
  3. ^ "MapReduce: Simplified Data Processing on Large Clusters" (PDF). googleusercontent.com.
  4. Wickham, Hadley (2011). "The split-apply-combine strategy for data analysis". Journal of Statistical Software. 40: 1–29. doi:10.18637/jss.v040.i01.
  5. "Our abstraction is inspired by the map and reduce primitives present in Lisp and many other functional languages." -"MapReduce: Simplified Data Processing on Large Clusters", by Jeffrey Dean and Sanjay Ghemawat; from Google Research
  6. Lämmel, R. (2008). "Google's Map Reduce programming model — Revisited". Science of Computer Programming. 70: 1–30. doi:10.1016/j.scico.2007.07.001.
  7. http://www.mcs.anl.gov/research/projects/mpi/mpi-standard/mpi-report-2.0/mpi2-report.htm MPI 2 standard
  8. "MPI Reduce and Allreduce · MPI Tutorial". mpitutorial.com.
  9. "Performing Parallel Rank with MPI · MPI Tutorial". mpitutorial.com.
  10. "MongoDB: Terrible MapReduce Performance". Stack Overflow. October 16, 2010. The MapReduce implementation in MongoDB has little to do with map reduce apparently. Because for all I read, it is single-threaded, while map-reduce is meant to be used highly parallel on a cluster. ... MongoDB MapReduce is single threaded on a single server...
  11. ^ Ullman, J. D. (2012). "Designing good MapReduce algorithms". XRDS: Crossroads, the ACM Magazine for Students. 19: 30–34. doi:10.1145/2331042.2331053. S2CID 26498063.
  12. Sverdlik, Yevgeniy (2014-06-25). "Google Dumps MapReduce in Favor of New Hyper-Scale Analytics System". Data Center Knowledge. Retrieved 2015-10-25. "We don't really use MapReduce anymore"
  13. "Why MapReduce Is Still A Dominant Approach For Large-Scale Machine Learning". Analytics India. April 5, 2019.
  14. Czajkowski, Grzegorz; Marián Dvorský; Jerry Zhao; Michael Conley (7 September 2011). "Sorting Petabytes with MapReduce – The Next Episode". Retrieved 7 April 2014.
  15. "MapReduce Tutorial".
  16. "Apache/Hadoop-mapreduce". GitHub. 31 August 2021.
  17. "Example: Count word occurrences". Google Research. Retrieved September 18, 2013.
  18. Fegaras, Leonidas (2017). "An algebra for distributed Big Data analytics". Journal of Functional Programming. 28. doi:10.1017/S0956796817000193. S2CID 44629767.
  19. Lin, Jimmy (29 Apr 2013). "Monoidify! Monoids as a Design Principle for Efficient MapReduce Algorithms". arXiv:1304.7544 .
  20. "Abstract Algebra for Scala".
  21. "Encoding Map-Reduce As A Monoid With Left Folding". 5 September 2016.
  22. ^ Senger, Hermes; Gil-Costa, Veronica; Arantes, Luciana; Marcondes, Cesar A. C.; Marín, Mauricio; Sato, Liria M.; da Silva, Fabrício A.B. (2015-01-01). "BSP cost and scalability analysis for MapReduce operations". Concurrency and Computation: Practice and Experience. 28 (8): 2503–2527. doi:10.1002/cpe.3628. hdl:10533/147670. ISSN 1532-0634. S2CID 33645927.
  23. Berlińska, Joanna; Drozdowski, Maciej (2010-12-01). "Scheduling divisible MapReduce computations". Journal of Parallel and Distributed Computing. 71 (3): 450–459. doi:10.1016/j.jpdc.2010.12.004.
  24. Bosagh Zadeh, Reza; Carlsson, Gunnar (2013). "Dimension Independent Matrix Square Using MapReduce" (PDF). Stanford University. arXiv:1304.1467. Bibcode:2013arXiv1304.1467B. Retrieved 12 July 2014.
  25. Ng, Andrew Y.; Bradski, Gary; Chu, Cheng-Tao; Olukotun, Kunle; Kim, Sang Kyun; Lin, Yi-An; Yu, YuanYuan (2006). "Map-Reduce for Machine Learning on Multicore". NIPS 2006.
  26. Ranger, C.; Raghuraman, R.; Penmetsa, A.; Bradski, G.; Kozyrakis, C. (2007). "Evaluating MapReduce for Multi-core and Multiprocessor Systems". 2007 IEEE 13th International Symposium on High Performance Computer Architecture. p. 13. CiteSeerX 10.1.1.220.8210. doi:10.1109/HPCA.2007.346181. ISBN 978-1-4244-0804-7. S2CID 12563671.
  27. He, B.; Fang, W.; Luo, Q.; Govindaraju, N. K.; Wang, T. (2008). "Mars: a MapReduce framework on graphics processors" (PDF). Proceedings of the 17th international conference on Parallel architectures and compilation techniques – PACT '08. p. 260. doi:10.1145/1454115.1454152. ISBN 9781605582825. S2CID 207169888.
  28. Chen, R.; Chen, H.; Zang, B. (2010). "Tiled-MapReduce: optimizing resource usages of data-parallel applications on multicore with tiling". Proceedings of the 19th international conference on Parallel architectures and compilation techniques – PACT '10. p. 523. doi:10.1145/1854273.1854337. ISBN 9781450301787. S2CID 2082196.
  29. Tang, B.; Moca, M.; Chevalier, S.; He, H.; Fedak, G. (2010). "Towards MapReduce for Desktop Grid Computing" (PDF). 2010 International Conference on P2P, Parallel, Grid, Cloud and Internet Computing. p. 193. CiteSeerX 10.1.1.671.2763. doi:10.1109/3PGCIC.2010.33. ISBN 978-1-4244-8538-3. S2CID 15044391.
  30. Luo, Y.; Guo, Z.; Sun, Y.; Plale, B.; Qiu, J.; Li, W. (2011). "A Hierarchical Framework for Cross-Domain MapReduce Execution" (PDF). Proceedings of the second international workshop on Emerging computational methods for the life sciences (ECMLS '11). CiteSeerX 10.1.1.364.9898. doi:10.1145/1996023.1996026. ISBN 978-1-4503-0702-4. S2CID 15179363.
  31. Lin, H.; Ma, X.; Archuleta, J.; Feng, W. C.; Gardner, M.; Zhang, Z. (2010). "MOON: MapReduce On Opportunistic eNvironments" (PDF). Proceedings of the 19th ACM International Symposium on High Performance Distributed Computing – HPDC '10. p. 95. doi:10.1145/1851476.1851489. ISBN 9781605589428. S2CID 2351790.
  32. Marozzo, F.; Talia, D.; Trunfio, P. (2012). "P2P-MapReduce: Parallel data processing in dynamic Cloud environments". Journal of Computer and System Sciences. 78 (5): 1382–1402. doi:10.1016/j.jcss.2011.12.021.
  33. Dou, A.; Kalogeraki, V.; Gunopulos, D.; Mielikainen, T.; Tuulos, V. H. (2010). "Misco: a MapReduce framework for mobile systems". Proceedings of the 3rd International Conference on PErvasive Technologies Related to Assistive Environments – PETRA '10. p. 1. doi:10.1145/1839294.1839332. ISBN 9781450300711. S2CID 14517696.
  34. Wang, Yandong; Goldstone, Robin; Yu, Weikuan; Wang, Teng (May 2014). "Characterization and Optimization of Memory-Resident MapReduce on HPC Systems". 2014 IEEE 28th International Parallel and Distributed Processing Symposium. IEEE. pp. 799–808. doi:10.1109/IPDPS.2014.87. ISBN 978-1-4799-3800-1. S2CID 11157612.
  35. "How Google Works". baselinemag.com. 7 July 2006. As of October, Google was running about 3,000 computing jobs per day through MapReduce, representing thousands of machine-days, according to a presentation by Dean. Among other things, these batch routines analyze the latest Web pages and update Google's indexes.
  36. Chambers, Craig; Raniwala, Ashish; Perry, Frances; Adams, Stephen; Henry, Robert R.; Bradshaw, Robert; Weizenbaum, Nathan (1 January 2010). "FlumeJava". Proceedings of the 31st ACM SIGPLAN Conference on Programming Language Design and Implementation (PDF). pp. 363–375. doi:10.1145/1806596.1806638. ISBN 9781450300193. S2CID 14888571. Archived from the original (PDF) on 23 September 2016. Retrieved 4 August 2016.
  37. Peng, D., & Dabek, F. (2010, October). Large-scale Incremental Processing Using Distributed Transactions and Notifications. In OSDI (Vol. 10, pp. 1-15).
  38. "Database Experts Jump the MapReduce Shark".
  39. ^ David DeWitt; Michael Stonebraker. "MapReduce: A major step backwards". craig-henderson.blogspot.com. Retrieved 2008-08-27.
  40. "Apache Hive – Index of – Apache Software Foundation".
  41. ^ "HBase – HBase Home – Apache Software Foundation".
  42. "Bigtable: A Distributed Storage System for Structured Data" (PDF).
  43. Greg Jorgensen. "Relational Database Experts Jump The MapReduce Shark". typicalprogrammer.com. Retrieved 2009-11-11.
  44. Pavlo, Andrew; Paulson, Erik; Rasin, Alexander; Abadi, Daniel J.; DeWitt, Deavid J.; Madden, Samuel; Stonebraker, Michael. "A Comparison of Approaches to Large-Scale Data Analysis". Brown University. Retrieved 2010-01-11.
  45. Hillis, W. Danny (1986). The Connection Machine. MIT Press. ISBN 0262081571.
  46. "Connection Machine Model CM-2 Technical Summary" (PDF). Thinking Machines Corporation. 1987-04-01. Retrieved 2022-11-21.
  47. "Supplement to the *Lisp Reference Manual" (PDF). Thinking Machines Corporation. 1988-09-01. Retrieved 2022-11-21.
  48. "Rediflow Architecture Prospectus" (PDF). University of Utah Department of Computer Science. 1986-04-05. Retrieved 2022-11-21.
  49. Ranka, Sanjay (1989). "2.6 Data Sum". Hypercube Algorithms for Image Processing and Pattern Recognition (PDF). University of Florida. Retrieved 2022-12-08.
  50. Paul, Ryan (20 January 2010). "Google's MapReduce patent: what does it mean for Hadoop?". Ars Technica. Retrieved 21 March 2021.
  51. "United States Patent: 7650331 - System and method for efficient large-scale data processing". uspto.gov.
  52. Nazer, Daniel (28 March 2013). "Google Makes Open Patent Non-assertion Pledge and Proposes New Licensing Models". Electronic Frontier Foundation. Retrieved 21 March 2021.
  53. King, Rachel (2013). "Google expands open patent pledge to 79 more about data center management". ZDNet. Retrieved 21 March 2021.
  54. "System and method for efficient large-scale data processing". Google Patents Search. 18 June 2004. Retrieved 21 March 2021.
  55. Gupta, Upa; Fegaras, Leonidas (2013-10-06). "Map-Based Graph Analysis on MapReduce" (PDF). Proceedings: 2013 IEEE International Conference on Big Data. 2013 IEEE International Conference on Big Data. Santa Clara, California: IEEE. pp. 24–30.
  56. Zaharia, Matei; Chowdhury, Mosharaf; Franklin, Michael; Shenker, Scott; Stoica, Ion (June 2010). Spark: Cluster Computing with Working Sets (PDF). HotCloud 2010.
Google
a subsidiary of Alphabet
Company
Divisions
Subsidiaries
Active
Defunct
Programs
Events
Infrastructure
People
Current
Former
Criticism
General
Incidents
Other
Development
Software
A–C
D–N
O–Z
Operating systems
Language models
Neural networks
Computer programs
Formats and codecs
Programming languages
Search algorithms
Domain names
Typefaces
Products (software and services)
Defunct or discontinued
Hardware
Pixel
Smartphones
Smartwatches
Tablets
Laptops
Other
Nexus
Smartphones
Tablets
Other
Other
Litigation
Advertising
Antitrust
Intellectual property
Privacy
Other
Related
Concepts
Products
Android
Street View coverage
YouTube
Other
Documentaries
Books
Popular culture
Other
Italics denote discontinued products.
Categories: