We have a group of mappers that work on dividing the keys for some reducers that actually work on that same group of data. The bottleneck is the assigning part: when mappers finish and need to handle the data to the reducers.
Introduction
Common input formats
You need to know well what
- Shards
- Textual input
- binary, parquet and similars
- CSV and similars
General framework
We can see in the image taken from the course book [({fourny} 2024)](https://ghislainfourny.github.io/big-data-textbook/), that there are two general phases for the map reduce framework: 1. First phase is mapping, where the data is divided into chunks and processed by the mappers, then these are delivered to the designated reducers. 2. The reduce process connects to all other machines to get the their own mapped value. This is usually the bottleneck, the part that runs in $\mathcal{O}(n^{2})$. 3. Reducers then processed the partitioned, probably homogeneous data and return a result, which is often a file in HDFS.MapReduce Architecture
We can process terabytes of data with thousands of nodes with this architecture.
Centralized Architecture
Hadoop MapReduce builds upon a centralized architecture compatible to Distributed file systems and HBase. In this case there is a Jobtracker, often in the same node as the NameNode; and Tasktrackers that are in the same node as the DataNodes. We have now three processes on the same machine. When MapReduce finishes, usually every TaskTracker produces a file in the disk (usually sharded, numerated increasingly). If the task fails for a node, the Jobtracker usually re-assigns the node to another.
Impendance Mismatch
MapReduce needs to have key-values to process the files. For text, we usually use the lines and its character offset to index them. We could also use a special character to separate different parts of the text. Often this part part of preprocessing the dataset so that it is understandable by mapreduce is called impendance mismatch
Combining
This is a form of optimization. When we are mapping, we can do part of the reducing step. Often the combining step is the same as the reducing step. But this is not often possible:
- Type must be the same for reduce function.
- We need to have commutative and associative functions for reduce.
After the combining step we have the following diagram: And there will be less overload on the network, which makes the whole thing faster.
Rule of thumb: tasks are 3x the number of nodes that are mapping and reducing.
On Terminology
You need to know what maps, reducer and combiners are. You also need to know what tasks, slots and phases, are.
Slots are bundles of CPU cores, memory resources, and network bandwidth. Usually, a single node could have more than one slot, which helps to handle the different needs for resources of different slots (some could need more memory, while others on the same node could need less memory, this better balances it). TODO: write this section better.
Splits and Blocks
Splits are map or reduce partition sets of the whole data that needs to be processed. This is physically stored in HDFS blocks. These blocks are exactly of size 128MB, which implies the first and last key-values could be splitted in different blocks. This is an inconvenience as one needs to fetch also the preceding and successive blocks. This is why in HDFS’s api one can read blocks partially. The underlying details on how these optimizations are done are not covered here.
Resource Management
We now introduce ResourceManagers, ApplicationManagers and NodeManagers. See here. For a not checked explanation.
Bottlenecks of the original version
Jobtrackers were the bottleneck and have many responsibilities.
Jobtracker’s single point of failure
Some responsibilities of the Jobtracker
- Resource Management
- Monitoring every task’s progress
- Job scheduling
Scalability issues
This is usually a bottleneck in this model. Clusters of about 4,000 become usually very very slow and about 10,000 tasks. Another drawback is that many reducers are idle during the map phase.
Containers are virtual collections of slots (one container could have more than one map or reduce slot), each with some cpu and some amount of memory.
Other bottlenecks
- Scalability : MapReduce has limited scalability, while YARN can scale to 10,000 nodes.
- Rigidity : MapReduce v1 only supports MapReduce specific jobs. There is a need, however, for scheduling non-MapReduce workloads. For instance, we would like the ability to share cluster with MPI, graph processing, and any user code.
- Resource utilization : in MapReduce v1, the reducers wait on the mappers to finish (and vice-versa), leaving large fractions of time when either the reducers or the mappers are idle. Ideally all resources should be used at any given time.
- Flexibility : mapper and reducer roles are decided at configuration time, and cannot be reconfigured.
Spark makes use of in-memory data too, making the access to data much faster.
Advantages of MapReduce
Nonetheless, there could be cases where MapReduce is better than Spark, for example when the data is too big to fit in memory.
- Ultra-large datasets that do not fit in memory: For massive datasets that don’t fit even across a large Spark cluster’s memory, MapReduce’s disk-based processing can be more manageable.
- Strict fault tolerance requirements: MapReduce’s fault tolerance mechanism is very robust, as it writes intermediate results to HDFS, which may be preferred in environments where data recovery is critical.
- Low hardware resources: Spark requires more memory and computational resources than MapReduce, so in resource-constrained environments, MapReduce may be a more practical choice.
ApplicationMaster
This is an innovation introduced by the YARN architecture. With this new architecture we have containers and a resource manager that allocates resources to the tasks that need to be processed. When a user starts a task, the resource manager elevates a container to be the application master by communicating with a process. This container then takes part of the responsibilities of the jobtracker in the old version; it now tracks task state, re allocates other containers when one fails, it asks or releases resources to the resource manager, at any time.
Setting up a task
ApplicationMasters can request and release containers at any time, dynamically. A container request is typically made by the ApplicationMasters with a specific demand (e.g., “10 containers with each 2 cores and 16 GB of RAM”). If the request is granted by the ResourceManager fully or partially, this is done indirectly by signing and issuing a container token to the ApplicationMaster that acts as proof that the resource was granted. The ApplicationMaster can then connect to the allocated NodeManager and send the token. The NodeManager will then check the validity of the token and provide the memory and CPU granted by the ResourceManager. The ApplicationMaster ships the code (e.g., as a jar file) as well as parameters, which then runs as a process with exclusive use of this memory and CPU. ~From the Book ({fourny} 2024)
The main facts to keep in mind:
- ApplicationMasters can set up and release resources dynamically at any time
- ApplicationMasters can request resources in a fine-grained manner, asking for exactly a specific amount of resources to the ResourceManager
- The Granting procedure uses authorization tokens.
- Code is usually shipped by the ApplicationMaster to the worker container.
Performance
This new version supports 10,000 nodes and 100,000 tasks, a big improvement over the previous version.
Resource tracking
Also in this case, as has been done similarly for Distributed file systems and others, we track the available resources of each machine using heartbeats sent by NodeManagers to the ResourceManager. In this way, the ResourceManager knows exacly what is the load of each node.
Scheduling methods
Queue scheduler
For example, the queue scheduler where a job takes the whole cluster one after the other. But it’s probably not efficient as it doesn’t allow for multiple-tenancy of the cluster.
Capacity Scheduler
Another way could be the hierarchical scheduler where the amount of resource is allocated a priori. This is also called capacity scheduling because the whole cluster is divided into multiple sub-clusters corresponding to a specific department in a university or parts of a company. Then, each department can use a specific local scheduler, which could be a standard fair queue.
Fair Schedule
It’s difficult to orchestrate the allocation of different resources. But how to allocate it correctly when the need of resources is different for other processes? Some processes could need more memory than cpu, Disk and Network Input and Output and others other resources. Some solution is dominant resource fairness, but we won’t delve into this topic here. 🟥 TODO: get the missing parts for dominant resource fairness.
Three types of schedule sharing
TODO: fill this in a later occasion (See page 273 of the book)
- Steady fair share:
- Instantaneous fair share:
- Current share:
References
[1] {fourny} “The Big Data Textbook” 2024