-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmapreduce.go
41 lines (35 loc) · 1.37 KB
/
mapreduce.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package mapreduce
import (
core "github.com/ipfs/go-ipfs/core"
"github.com/libp2p/go-libp2p-core/protocol"
gorpc "github.com/libp2p/go-libp2p-gorpc"
"github.com/omkarprabhu-98/go-ipfs-mapreduce/core/mapper"
"github.com/omkarprabhu-98/go-ipfs-mapreduce/core/master"
"github.com/omkarprabhu-98/go-ipfs-mapreduce/core/reducer"
)
var protocolID = protocol.ID("/ipfs/mapreduce")
func RegisterProtocol(node *core.IpfsNode) error {
mapService := mapper.MapService{Node: node}
reduceService := reducer.ReduceService{Node: node}
rpcHost := gorpc.NewServer(node.PeerHost, protocolID)
rpcHost.Register(&mapService)
rpcHost.Register(&reduceService)
return nil
}
func InitMaster(node *core.IpfsNode, mapFuncFilePath string, reduceFuncFilePath string,
noOfReducers int, dataFileCid string, outputFile string) (*master.Master, error) {
master := master.Master{
Node: node, MapFuncFilePath: mapFuncFilePath, ReduceFuncFilePath: reduceFuncFilePath,
DataFileCid: dataFileCid, BlockProviders: make(map[string][]string),
MapAllocation: make(map[string]string),
ReduceAllocation: make(map[int]string),
MapOutput: make(map[string][]string),
ReduceFileMap: make(map[int][]string),
ReduceOutput: make(map[int]string),
NoOfReducers: noOfReducers,
MrOutputFile: outputFile,
}
rpcHost := gorpc.NewServer(node.PeerHost, protocolID)
rpcHost.Register(&master)
return &master, nil
}