Ooso lets you run MapReduce jobs in a serverless way. It is based on managed cloud services, Amazon S3 and AWS Lambda and is mainly an alternative to standard ad-hoc querying and batch processing tools such as Hadoop and Spark.
The library workflow is as follows:
- The workflow begins by invoking the
Mappers Driver
lambda function - The
Mappers Driver
does two things:- It computes batches of data splits and assigns each batch to a
Mapper
- It invokes a
Mappers Listener
lambda function which is responsible of detecting the end of the map phase
- It computes batches of data splits and assigns each batch to a
- Once the
Mappers Listener
detects the end of the map phase, it invokes a first instance of theReducers Driver
function - The
Reducers Driver
is somewhat similar to theMappers Driver
:- It computes batches from either the
Map Output Bucket
if we are in the first step of the reduce phase, or from previous reducers outputs located in theReduce Output Bucket
. It then assigns each batch to aReducer
- It also invokes a
Reducers Listener
for each step of the reduce phase.
- It computes batches from either the
- Once the
Reducers Listener
detects the end of a reduce step, it decides whether to invoke the nextReducers Driver
if the previous reduce step produced more than one file. Otherwise, there is no need to invoke aReducers Driver
, because the previous step would have produced one single file which is the result of the job
The easiest way is to clone the repository and use the provided example project directory which has the following structure:
.
├── package.sh
├── provide_job_info.py
├── pom.xml
└── src
└── main
├── java
│ ├── mapper
│ │ └── Mapper.java
│ └── reducer
│ └── Reducer.java
└── resources
└── jobInfo.json
Declare the library dependency in the pom.xml
file
<dependencies>
...
<dependency>
<groupId>fr.d2-si</groupId>
<artifactId>ooso</artifactId>
<version>0.0.4</version>
</dependency>
...
</dependencies>
Implement your Mapper
, Reducer
and Launcher
:
-
The class Mapper is the implementation of your mappers. It must extend the
fr.d2si.ooso.mapper.MapperAbstract
class which looks like the following:public abstract class MapperAbstract { public abstract String map(BufferedReader objectBufferedReader); }
The
map
method receives aBufferedReader
as a parameter which is a reader of the batch part that the mapper lambda processes. The Reader closing is done internally for you. -
The class Reducer is the implementation of your reducers. It must extend the
fr.d2si.ooso.reducer.ReducerAbstract
class which looks like the following:public abstract class ReducerAbstract { public abstract String reduce(List<ObjectInfoSimple> batch); }
The
reduce
method receives a list ofObjectInfoSimple
instances, which encapsulate information about the objects to be reduced. In order to get a reader from anObjectInfoSimple
instance, you can do something like this:public String reduce(List<ObjectInfoSimple> batch) { for (ObjectInfoSimple objectInfo : batch) { BufferedReader objectBufferedReader = Commons.getReaderFromObjectInfo(objectInfo); //do something with the reader then close it objectBufferedReader.close(); } }
For the reducer, you are responsible of closing the opened readers.
-
The Launcher is responsible of starting your job. Under the hood, it serializes your
Mapper
andReducer
and sends them to yourMappers Driver
which then propagates them to the rest of the lambdas.All you need to do is to create a class with a main method and instantiate a
Launcher
that points to yourMapper
andReducer
. Your class should look like this:public class JobLauncher { public static void main(String[] args) { //setup your launcher Launcher myLauncher = new Launcher() .withMapper(new Mapper()) .withReducer(new Reducer()); //launch your job myLauncher.launchJob(); } }
In order to make our jar package executable, you need to set the main class that serves as the application entry point.
If you are using the maven shade plugin, you can do so as follows:
<build> ... <plugins> <plugin> ... <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>job.JobLauncher</mainClass> </transformer> </transformers> </configuration> ... </plugin> </plugins> ... </build>
Please take a look at one of our example pom.xml files for further details on how to configure your maven project.
Edit the jobInfo.json
file located at src/main/resources
to reflect your infrastructure details:
{
"jobId": "your-job-id",
"jobInputBucket": "input",
"mapperOutputBucket": "mapper-output",
"reducerOutputBucket": "reducer-output",
"mapperFunctionName": "mapper",
"mappersDriverFunctionName": "mappers_driver",
"mappersListenerFunctionName": "mappers_listener",
"reducerFunctionName": "reducer",
"reducersDriverFunctionName": "reducers_driver",
"reducersListenerFunctionName": "reducers_listener",
"mapperForceBatchSize": "-1",
"reducerForceBatchSize": "-1",
"mapperMemory": "1536",
"reducerMemory": "1536",
"disableReducer": "false"
}
Below is the description of some attributes (the rest is self explanatory).
Attribute | Description |
---|---|
jobId | Used to identify a job and separate outputs in order to avoid overwriting data between jobs |
jobInputBucket | Contains the dataset splits that each Mapper will process |
mapperOutputBucket | The bucket where the mappers will put their results |
reducerOutputBucket | The bucket where the reducers will put their results |
reducerMemory and mapperMemory | The amount of memory(and therefore other resources) allocated to the lambda functions. They are used internally by the library to compute the batch size that each mapper/reducer will process. |
mapperForceBatchSize and reducerForceBatchSize | Used to force the library to use the specified batch size instead of automatically computing it. reducerForceBatchSize must be greater or equal than 2 |
disableReducer | If set to "true", disables the reducer |
In order to generate the jar file used during the deployment of the lambda, you need to install maven.
Then, run package.sh
script to create the project jar:
./package.sh
Before diving into the infrastructure details, please have a look at the deployment section.
Our lambda functions use S3 Buckets to fetch needed files and put the result of their processing.
You need three buckets:
- An input bucket containing your data splits
- Two intermediary buckets used by the mappers and reducers
You must use the same bucket names used in the configuration step above.
You may create the buckets using the console, the command line or our Terraform template.
a. Create an IAM role with the following trust policy
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
You may create the IAM role using the console, the command line or our Terraform template.
b. Attach the following policies to your role
arn:aws:iam::aws:policy/AWSLambdaFullAccess
arn:aws:iam::aws:policy/AmazonS3FullAccess
Note that these policies are too broad. You may use more fine-grained policies/roles for each lambda.
You may attach the policies using the console, the command line or our Terraform template.
Create the required lambdas with the following details:
Lambda Name | Handler | Memory | Function package | Runtime |
---|---|---|---|---|
mappers_driver | fr.d2si.ooso.mappers_driver.MappersDriver | 1536 | example-project/target/job.jar | java8 |
mappers_listener | fr.d2si.ooso.mappers_listener.MappersListener | 1536 | example-project/target/job.jar | java8 |
mapper | fr.d2si.ooso.mapper_wrapper.MapperWrapper | same value as in the configuration file | example-project/target/job.jar | java8 |
reducers_driver | fr.d2si.ooso.reducers_driver.ReducersDriver | 1536 | example-project/target/job.jar | java8 |
reducers_listener | fr.d2si.ooso.reducers_listener.ReducersListener | 1536 | example-project/target/job.jar | java8 |
reducer | fr.d2si.ooso.reducer_wrapper.ReducerWrapper | same value as in the configuration file | example-project/target/job.jar | java8 |
We assume that the project jar is located at example-project/target/job.jar
.
You may create the lambda functions using the console, the command line or our Terraform template.
Note that you'll only need to deploy the lambdas once. You will be able to run all your jobs even if your business code changes without redeploying the infrastructure.
a. The easy way
We provide a fully functional Terraform template that creates everything for you, except the input bucket. This template uses the job configuration file. Here is how to use it:
- install Terraform
- make sure your job configuration file is correct
- run the following commands
cd terraform terraform plan terraform apply
For more info about Terraform, check Terraform documentation.
b. The less easy way
You may use any deployment method you are familiar with. The AWS console, the AWS cli, python scripts, ... However we recommend using an Infrastructure-As-Code (IAC) tool such as Terraform or CloudFormation.
In order to run your job, you may execute the main method of the same jar that you used during the deployment. You may either execute it from your IDE or using the command line as follows:
java -jar job.jar