-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathoutput_http.c
136 lines (105 loc) · 3.05 KB
/
output_http.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#include <sys/time.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <glib.h>
#include "output.h"
#include "simplebuffer.h"
#include "libhttp.h"
extern struct http_server *hserver;
static void output_remove_receiver(struct http_receiver_s *hr) {
struct output_s *o=hr->output;
struct http_connection *hc=hr->hc;
o->http_receiver=g_list_remove(o->http_receiver, hr);
o->receiver--;
logwrite(LOG_INFO, "stream_http: dropping connection to %s for %s",
inet_ntoa(hc->sin.sin_addr),
hc->url);
http_drop_connection(hc);
free(hr);
}
static int output_cb_http(struct http_connection *hc, int cbtype, void *arg) {
switch(cbtype) {
case(HCB_QUERY): {
struct http_receiver_s *hr;
struct output_s *o=arg; /* HCB_QUERY returns http_url args */
hr=calloc(1, sizeof(struct http_receiver_s));
hr->hc=hc;
hr->output=o;
/* Put into stream output list */
o->http_receiver=g_list_append(o->http_receiver, hr);
o->receiver++;
/* Store http_receiver into http_connection structure */
hc->arg=hr;
/* Return head */
http_header_start(hc, "200 OK", "application/octet-stream");
http_header_nocache(hc);
http_header_clength(hc, -1);
http_header_end(hc);
logwrite(LOG_INFO, "stream_http: connection from %s for %s",
inet_ntoa(hc->sin.sin_addr),
hc->url);
break;
}
case(HCB_ERROR): {
output_remove_receiver(hc->arg);
break;
}
}
return 1;
}
#define HTTP_MAX_TS (20000/TS_PACKET_SIZE)
int output_init_http(struct output_s *o) {
o->buffer=sb_init(HTTP_MAX_TS, TS_PACKET_SIZE, 0);
if (o->buffer == NULL)
return 0;
o->hurl=calloc(2, sizeof(struct http_url));
o->hurl->url=o->url;
o->hurl->cb=output_cb_http;
o->hurl->arg=(void *) o;
http_register_url(hserver, o->hurl);
return 0;
}
#define HTTP_MAX_QUEUED (200*1024)
#define HTTP_MAX_OVERFLOW 20
void output_send_http_one(gpointer data, gpointer user_data) {
struct http_receiver_s *hr=data;
struct output_s *o=user_data;
/*
* Check how many bytes we already have queued on this
* HTTP connection. Users might connect from low bandwidth
* links not beeing able to transmit the full feed. We must
* avoid consuming all memory.
*
* If the situation persists too long we drop the connection
*
*/
if (http_get_queue(hr->hc) > HTTP_MAX_QUEUED) {
hr->overflow++;
if (hr->overflow > HTTP_MAX_OVERFLOW)
output_remove_receiver(hr);
return;
}
hr->overflow=0;
/*
* We cant reuse evbuffer as they are empty after
* passing the the buffevent layer - Thus we recreate
* the buffer every time we send it out. This involes
* a little more memcpy as really necessary but for now
* its enough
*
*/
http_return_stream(hr->hc, sb_bufptr(o->buffer), sb_buflen(o->buffer));
}
void output_send_http(struct output_s *o, uint8_t *tsp) {
sb_add_atoms(o->buffer, tsp, 1);
if (!sb_free_atoms(o->buffer)) {
/*
* If the output buffer is full - loop on all http sesssions
* and send out the buffer as a http chunk
*/
g_list_foreach(o->http_receiver,
output_send_http_one, (gpointer) o);
sb_zap(o->buffer);
}
}