-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathServer.java
141 lines (128 loc) · 4.02 KB
/
Server.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package bn;
import java.net.Socket;
import java.net.ServerSocket;
import java.net.SocketException;
import java.io.BufferedInputStream;
import java.io.InputStreamReader;
import java.io.IOException;
import java.io.EOFException;
import java.util.ArrayList;
import java.util.HashMap;
import bn.MessageRouter;
import bn.StatsUpdater;
class InConnectionWorker implements Runnable {
private static final int BUFFER_SIZE = 1024 * 1024;
private Socket socket;
private String id;
private MessageRouter router;
private StatsUpdater updater;
private ArrayList<Long> latencies;
private Controller controller;
private volatile boolean terminated = false;
public InConnectionWorker(Socket socket, MessageRouter router, StatsUpdater updater) throws Exception {
this.controller = Controller.getInstance();
this.socket = socket;
this.router = router;
this.updater = updater;
this.latencies = new ArrayList<Long>();
this.id = this.socket.getInetAddress() + ":" + String.valueOf(this.socket.getPort());
System.out.println("Connection `" + this.id + "` connected!");
}
public boolean isDead() {
return terminated;
}
public void run() {
try {
BufferedInputStream inputStream = new BufferedInputStream(this.socket.getInputStream(), BUFFER_SIZE);
try {
while (true) {
Message message = new Message(inputStream);
message.incNumHops();
System.out.println("INCOMING MESSAGE: " + message);
if (message.isGoodBye()) {
break;
}
else {
latencies.add(controller.getTimestamp() - message.getTimestamp());
router.route(message);
}
}
}
catch (Exception e) {
if (!(e instanceof SocketException)) {
e.printStackTrace();
}
}
finally {
updater.updateStats(latencies);
inputStream.close();
this.socket.close();
terminated = true;
System.out.println("Connection `" + this.id + "` disconnected!");
}
} catch (IOException e) {
e.printStackTrace();
}
}
public void stop() throws Exception {
socket.close();
}
}
class ServerWorker implements Runnable {
private static final int BUFFER_SIZE = 1024 * 1024 * 128; // 128 MB
private ServerSocket server = null;
private MessageRouter router = null;
private StatsUpdater updater = null;
private ArrayList<InConnectionWorker> workers;
public ServerWorker(int port, MessageRouter router, StatsUpdater updater) throws Exception {
System.out.println("Trying to start server on port " + port);
server = new ServerSocket(port);
server.setReceiveBufferSize(BUFFER_SIZE);
System.out.println("Server listening on port " + server.getLocalPort());
workers = new ArrayList<InConnectionWorker>();
this.router = router;
this.updater = updater;
}
public void run() {
try {
while (true) {
Socket socket = server.accept();
InConnectionWorker worker = new InConnectionWorker(socket, router, updater);
Thread thread = new Thread(worker);
thread.start();
workers.add(worker);
}
}
catch (Exception e) {
// Closing serverSocket will throw Exception then stop listener.
if (!(e instanceof SocketException)) {
e.printStackTrace();
}
}
}
public void stop() throws Exception {
if (server != null) {
server.close();
System.out.println("Server Closed");
for (InConnectionWorker worker : workers) {
worker.stop();
}
for (InConnectionWorker worker : workers) {
while (!worker.isDead()) {
Thread.sleep(100);
}
}
}
}
}
public class Server {
private static ServerWorker worker;
public static void startServer(int port, MessageRouter router, StatsUpdater updater) throws Exception {
worker = new ServerWorker(port, router, updater);
Thread thread = new Thread(worker);
thread.start();
}
public static void stopServer() throws Exception {
if (worker != null) worker.stop();
}
}