From 9e93f2937e1eca30c5d6faa247c5c772d493c888 Mon Sep 17 00:00:00 2001 From: Frederik Slos Date: Fri, 20 Jun 2014 18:29:24 +0200 Subject: [PATCH 01/19] figure out which time function to use --- configure | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/configure b/configure index 9e48d2b34..eca59373d 100755 --- a/configure +++ b/configure @@ -59,6 +59,38 @@ do_pkg_config() fi } +#figure out which time function to use +check_time_function() +{ + cat >function_test.c < + +int main(int argc, char **argv){ + struct timespec *tsp; + clock_gettime(CLOCK_MONOTONIC, tsp); +} +FUNC_EOF + + if ${CC} ${CFLAGS} ${LDFLAGS} -c function_test.c > /dev/null 2>&1 /dev/null; then + echo "Found clock_gettime()" + echo -n "Checking if rt lib switch should be added: " + if ${CC} ${CFLAGS} ${LDFLAGS} function_test.c -lrt > /dev/null 2>&1 /dev/null; then + echo "-lrt works, adding" + LDFLAGS="${LDFLAGS} -lrt" + else + echo "-lrt not found, won't add" + fi + else + echo "Checking if we can use mach time" + check_header mach/mach.h + check_header mach/clock.h + export_config MACH_TIME + fi + rm -f function_test.* +} + +check_time_function + do_pkg_config OpenSSL openssl do_pkg_config libao ao CONFIG_AO do_pkg_config PulseAudio libpulse-simple CONFIG_PULSE From 5671cf41a60411c15af1d3a86d6602c9357507d1 Mon Sep 17 00:00:00 2001 From: Frederik Slos Date: Sat, 1 Feb 2014 12:51:48 +0100 Subject: [PATCH 02/19] add delay reporting to sinks --- audio.h | 3 +++ audio_alsa.c | 23 ++++++++++++++++++++++- audio_ao.c | 3 ++- audio_pipe.c | 3 ++- audio_pulse.c | 16 +++++++++++++++- audio_sndio.c | 24 ++++++++++++++++++++++-- 6 files changed, 66 insertions(+), 6 deletions(-) diff --git a/audio.h b/audio.h index 66c9c2f09..1edbd0be1 100644 --- a/audio.h +++ b/audio.h @@ -17,6 +17,9 @@ typedef struct { // may be NULL, in which case soft volume is applied void (*volume)(double vol); + + //time in us it takes before a new sample is output + long long (*get_delay)(void); } audio_output; audio_output *audio_get_output(char *name); diff --git a/audio_alsa.c b/audio_alsa.c index 6c0f8063c..65cc63be6 100644 --- a/audio_alsa.c +++ b/audio_alsa.c @@ -40,6 +40,7 @@ static void start(int sample_rate); static void play(short buf[], int samples); static void stop(void); static void volume(double vol); +static long long get_delay(void); audio_output audio_alsa = { .name = "alsa", @@ -49,7 +50,8 @@ audio_output audio_alsa = { .start = &start, .stop = &stop, .play = &play, - .volume = NULL + .volume = NULL, + .get_delay = &get_delay }; static snd_pcm_t *alsa_handle = NULL; @@ -65,6 +67,8 @@ static char *alsa_mix_dev = NULL; static char *alsa_mix_ctrl = "Master"; static int alsa_mix_index = 0; +static int device_sample_rate; + static void help(void) { printf(" -d output-device set the output device [default*|...]\n" " -t mixer-type set the mixer type [software*|hardware]\n" @@ -153,6 +157,7 @@ static void deinit(void) { static void start(int sample_rate) { if (sample_rate != 44100) die("Unexpected sample rate!"); + device_sample_rate = sample_rate; int ret, dir = 0; snd_pcm_uframes_t frames = 64; @@ -193,3 +198,19 @@ static void volume(double vol) { if(snd_mixer_selem_set_playback_volume_all(alsa_mix_elem, alsa_volume) != 0) die ("Failed to set playback volume"); } + +static long long get_delay(void) { + snd_pcm_sframes_t frames = 0; + snd_pcm_delay(alsa_handle, &frames); + + if (frames < 0) + { +#if SND_LIB_VERSION >= 0x000901 /* snd_pcm_forward() exists since 0.9.0rc8 */ + snd_pcm_forward(alsa_handle, -frames); +#endif + frames = 0; + } + + return (long long)frames * 1000000LL / (long long)device_sample_rate; +} + diff --git a/audio_ao.c b/audio_ao.c index 1c5f913ea..6f1f5894a 100644 --- a/audio_ao.c +++ b/audio_ao.c @@ -125,5 +125,6 @@ audio_output audio_ao = { .start = &start, .stop = &stop, .play = &play, - .volume = NULL + .volume = NULL, + .get_delay = NULL }; diff --git a/audio_pipe.c b/audio_pipe.c index 759e73d01..7e6ad15b3 100644 --- a/audio_pipe.c +++ b/audio_pipe.c @@ -136,5 +136,6 @@ audio_output audio_pipe = { .start = &start, .stop = &stop, .play = &play, - .volume = NULL + .volume = NULL, + .get_delay = NULL }; diff --git a/audio_pulse.c b/audio_pulse.c index b2142b8e5..c120de8a1 100644 --- a/audio_pulse.c +++ b/audio_pulse.c @@ -116,6 +116,18 @@ static void stop(void) { fprintf(stderr, __FILE__": pa_simple_drain() failed: %s\n", pa_strerror(pa_error)); } +static long long get_delay() { + pa_usec_t latency; + latency = pa_simple_get_latency(pa_dev, &pa_error); + if (pa_error < 0 ) + { + latency = (pa_usec_t) 0; + fprintf(stderr, __FILE__": get_delay() failed: %s\n", pa_strerror(pa_error)); + } + + return (long long)latency; +} + audio_output audio_pulse = { .name = "pulse", .help = &help, @@ -124,5 +136,7 @@ audio_output audio_pulse = { .start = &start, .stop = &stop, .play = &play, - .volume = NULL + .volume = NULL, + .get_delay = &get_delay }; + diff --git a/audio_sndio.c b/audio_sndio.c index 84fc64738..e1c0a2829 100644 --- a/audio_sndio.c +++ b/audio_sndio.c @@ -23,6 +23,14 @@ static struct sio_hdl *sio; static struct sio_par par; +long long writepos; /* frames written */ + + +static void position_changed(void *addr, int delta) +{ + writepos -= delta; +} + static int init(int argc, char **argv) { sio = sio_open(SIO_DEVANY, SIO_PLAY, 0); if (!sio) @@ -41,6 +49,8 @@ static int init(int argc, char **argv) { if (!sio_getpar(sio, &par)) die("sndio: failed to get audio parameters"); + sio_onmove(sio, position_changed, NULL); + return 0; } @@ -51,11 +61,14 @@ static void deinit(void) { static void start(int sample_rate) { if (sample_rate != par.rate) die("unexpected sample rate!"); + writepos = 0; sio_start(sio); } static void play(short buf[], int samples) { - sio_write(sio, (char *)buf, samples * par.bps * par.pchan); + int bytes_per_frame = par.bps * par.pchan; + sio_write(sio, (char *)buf, samples * bytes_per_frame); + writepos += samples; } static void stop(void) { @@ -72,6 +85,12 @@ static void volume(double vol) { sio_setvol(sio, v); } +static long long get_delay(void) { + long long delay = writepos * 1000000LL / (long long)par.rate; + + return delay; +} + audio_output audio_sndio = { .name = "sndio", .help = &help, @@ -80,5 +99,6 @@ audio_output audio_sndio = { .start = &start, .stop = &stop, .play = &play, - .volume = &volume + .volume = &volume, + .get_delay = get_delay }; From 6a920600166242fee1b3d8183502585a5c0d5c48 Mon Sep 17 00:00:00 2001 From: Frederik Slos Date: Tue, 14 Jan 2014 19:50:12 +0100 Subject: [PATCH 03/19] add timing functions --- rtp.c | 308 ++++++++++++++++++++++++++++++++++++++++++++++++++++----- rtp.h | 5 +- rtsp.c | 4 +- 3 files changed, 290 insertions(+), 27 deletions(-) diff --git a/rtp.c b/rtp.c index 51b846d5d..a9e8bd1a0 100644 --- a/rtp.c +++ b/rtp.c @@ -32,47 +32,189 @@ #include #include #include +#include +#include "config.h" #include "common.h" #include "player.h" +#ifdef MACH_TIME +#include +#include +#endif + +#define NTPCACHESIZE 7 // only one RTP session can be active at a time. static int running = 0; static int please_shutdown; static SOCKADDR rtp_client; -static int sock; +static SOCKADDR rtp_timing; +static socklen_t addrlen; +static int server_sock; +static int timing_sock; static pthread_t rtp_thread; +static pthread_t ntp_receive_thread; +static pthread_t ntp_send_thread; +long long ntp_cache[NTPCACHESIZE + 1]; +static int strict_rtp; + +void rtp_record(int rtp_mode){ + debug(2, "Setting strict_rtp to %d\n", rtp_mode); + strict_rtp = rtp_mode; +} + +static void get_current_time(struct timespec *tsp) { +#ifdef MACH_TIME + kern_return_t retval = KERN_SUCCESS; + clock_serv_t cclock; + mach_timespec_t mts; + + host_get_clock_service(mach_host_self(), SYSTEM_CLOCK, &cclock); + retval = clock_get_time(cclock, &mts); + mach_port_deallocate(mach_task_self(), cclock); + + tsp->tv_sec = mts.tv_sec; + tsp->tv_nsec = mts.tv_nsec; +#else + clock_gettime(CLOCK_MONOTONIC, tsp); +#endif +} + +static void reset_ntp_cache() { + int i; + for (i = 0; i < NTPCACHESIZE; i++) { + ntp_cache[i] = LLONG_MIN; + } + ntp_cache[NTPCACHESIZE] = 0; +} + +long long get_ntp_offset() { + return ntp_cache[NTPCACHESIZE]; +} + +static void update_ntp_cache(long long offset, long long arrival_time) { + // average the offsets, filter out outliers + + int i, d, minindex, maxindex; + long long total; + + for (i = 0; i < (NTPCACHESIZE - 1); i++) { + ntp_cache[i] = ntp_cache[i+1]; + } + ntp_cache[NTPCACHESIZE - 1] = offset; + + d = 0; + minindex = 0; + maxindex = 0; + for (i = 0; i < NTPCACHESIZE; i++) { + if (ntp_cache[i] != LLONG_MIN) { + d++; + minindex = (ntp_cache[i] < ntp_cache[minindex] ? i : minindex); + maxindex = (ntp_cache[i] > ntp_cache[maxindex] ? i : maxindex); + } + } + debug(2, "ntp: valid entries: %d\n", d); + if (d < 5) + minindex = maxindex = -1; + d = 0; + total = 0; + for (i = 0; i < NTPCACHESIZE; i++) { + debug(3, "ntp[%d]: %lld, d: %d\n", i, ntp_cache[i] , d); + if ((ntp_cache[i] != LLONG_MIN) && (i != minindex) && (i != maxindex)) { + d++; + total += ntp_cache[i]; + } + } + ntp_cache[NTPCACHESIZE] = total / d; + debug(2, "ntp: offset: %lld, d: %d\n", ntp_cache[NTPCACHESIZE], d); +} + +static long long tspk_to_us(struct timespec tspk) { + long long usecs; + + usecs = tspk.tv_sec * 1000000LL; + + return usecs + (tspk.tv_nsec / 1000); +} + +long long tstp_us() { + struct timespec tv; + get_current_time(&tv); + return tspk_to_us(tv); +} + +static long long ntp_tsp_to_us(uint32_t timestamp_hi, uint32_t timestamp_lo) { + long long timetemp; + + timetemp = (long long)timestamp_hi * 1000000LL; + timetemp += ((long long)timestamp_lo * 1000000LL) >> 32; + + return timetemp; +} static void *rtp_receiver(void *arg) { // we inherit the signal mask (SIGUSR1) uint8_t packet[2048], *pktp; - + sync_cfg sync_tag, no_tag; + sync_cfg * pkt_tag; + int sync_fresh = 0; ssize_t nread; + + no_tag.rtp_tsp = 0; + no_tag.ntp_tsp = 0; + no_tag.sync_mode = NOSYNC; + while (1) { if (please_shutdown) break; - nread = recv(sock, packet, sizeof(packet), 0); + nread = recv(server_sock, packet, sizeof(packet), 0); if (nread < 0) break; ssize_t plen = nread; uint8_t type = packet[1] & ~0x80; - if (type == 0x54) // sync + if (type==0x54) { // sync + if (plen != 20) { + warn("Sync packet with wrong length %d received\n", plen); + continue; + } + + sync_tag.rtp_tsp = ntohl(*(uint32_t *)(packet+16)); + debug(3, "Sync packet rtp_tsp %lu\n", sync_tag.rtp_tsp); + sync_tag.ntp_tsp = ntp_tsp_to_us(ntohl(*(uint32_t *)(packet+8)), ntohl(*(uint32_t *)(packet+12))); + debug(3, "Sync packet ntp_tsp %lld\n", sync_tag.ntp_tsp); + // check if extension bit is set; this will be the case for the first sync + sync_tag.sync_mode = ((packet[0] & 0x10) ? E_NTPSYNC : NTPSYNC); + sync_fresh = 1; continue; + } if (type == 0x60 || type == 0x56) { // audio data / resend pktp = packet; if (type==0x56) { pktp += 4; plen -= 4; } - seq_t seqno = ntohs(*(unsigned short *)(pktp+2)); + + seq_t seqno = ntohs(*(uint16_t *)(pktp+2)); + unsigned long rtp_tsp = ntohl(*(uint32_t *)(pktp+4)); pktp += 12; plen -= 12; // check if packet contains enough content to be reasonable if (plen >= 16) { - player_put_packet(seqno, pktp, plen); + // strict -> find a rtp match, this might happen on resend packets, or, + // in weird network circumstances, even more than once. + // non-strickt -> just stick it to the first audio packet, _once_ + if ((strict_rtp && (rtp_tsp == sync_tag.rtp_tsp)) + || (!strict_rtp && sync_fresh && (type == 0x60))) { + debug(2, "Packet for with sync data was sent has arrived (%04X)\n", seqno); + pkt_tag = &sync_tag; + sync_fresh = 0; + } else + pkt_tag = &no_tag; + + player_put_packet(seqno, *pkt_tag, pktp, plen); continue; } if (type == 0x56 && seqno == 0) { @@ -86,12 +228,98 @@ static void *rtp_receiver(void *arg) { } debug(1, "RTP thread interrupted. terminating.\n"); - close(sock); return NULL; } -static int bind_port(SOCKADDR *remote) { +static void *ntp_receiver(void *arg) { + // we inherit the signal mask (SIGUSR1) + uint8_t packet[2048]; + struct timespec tv; + + ssize_t nread; + while (1) { + if (please_shutdown) + break; + nread = recv(timing_sock, packet, sizeof(packet), 0); + if (nread < 0) + break; + get_current_time(&tv); + + ssize_t plen = nread; + uint8_t type = packet[1] & ~0x80; + if (type == 0x53) { + if (plen != 32) { + warn("Timing packet with wrong length %d received\n", plen); + continue; + } + long long ntp_ref_tsp = ntp_tsp_to_us(ntohl(*(uint32_t *)(packet+8)), ntohl(*(uint32_t *)(packet+12))); + debug(2, "Timing packet ntp_ref_tsp %lld\n", ntp_ref_tsp); + long long ntp_rec_tsp = ntp_tsp_to_us(ntohl(*(uint32_t *)(packet+16)), ntohl(*(uint32_t *)(packet+20))); + debug(2, "Timing packet ntp_rec_tsp %lld\n", ntp_rec_tsp); + long long ntp_sen_tsp = ntp_tsp_to_us(ntohl(*(uint32_t *)(packet+24)), ntohl(*(uint32_t *)(packet+28))); + debug(2, "Timing packet ntp_sen_tsp %lld\n", ntp_sen_tsp); + long long ntp_loc_tsp = tspk_to_us(tv); + debug(2, "Timing packet ntp_loc_tsp %lld\n", ntp_loc_tsp); + + // from the ntp spec: + // d = (t4 - t1) - (t3 - t2) and c = (t2 - t1 + t3 - t4)/2 + long long d = (ntp_loc_tsp - ntp_ref_tsp) - (ntp_sen_tsp - ntp_rec_tsp); + long long c = ((ntp_rec_tsp - ntp_ref_tsp) + (ntp_sen_tsp - ntp_loc_tsp)) / 2; + + debug(2, "Round-trip delay %lld us\n", d); + debug(2, "Clock offset %lld us\n", c); + update_ntp_cache(c, ntp_loc_tsp); + + continue; + } + warn("Unknown Timing packet of type 0x%02X length %d", type, nread); + } + + debug(1, "Time receive thread interrupted. terminating.\n"); + + return NULL; +} + +static void *ntp_sender(void *arg) { + // we inherit the signal mask (SIGUSR1) + int i = 0; + int cc; + struct timespec tv; + char req[32]; + memset(req, 0, sizeof(req)); + + while (1) { + // at startup, we send more timing request to fill up the cache + if (please_shutdown) + break; + + req[0] = 0x80; + req[1] = 0x52|0x80; // Apple 'ntp request' + *(uint16_t *)(req+2) = htons(7); // seq no, needs to be 7 or iTunes won't respond + + get_current_time(&tv); + *(uint32_t *)(req+24) = htonl((uint32_t)tv.tv_sec); + *(uint32_t *)(req+28) = htonl((uint32_t)tv.tv_nsec * 0x100000000 / (1000 * 1000 * 1000)); + + cc = sendto(timing_sock, req, sizeof(req), 0, (struct sockaddr*)&rtp_timing, addrlen); + if (cc < 0){ + die("send packet failed in send_timing_packet. error(%d)\n", errno); + } + debug(2, "Current time s:%lu us:%lu\n", (unsigned int) tv.tv_sec, (unsigned int) tv.tv_nsec / 1000); + // todo: randomize time at which to send timing packets to avoid timing floods at the client + if (i<2){ + i++; + usleep(50000); + } else + sleep(3); + } + + debug(1, "Time send thread interrupted. terminating.\n"); + + return NULL; +} +static struct addrinfo *get_address_info(SOCKADDR *remote) { struct addrinfo hints, *info; memset(&hints, 0, sizeof(hints)); @@ -104,10 +332,16 @@ static int bind_port(SOCKADDR *remote) { if (ret < 0) die("failed to get usable addrinfo?! %s", gai_strerror(ret)); - sock = socket(remote->SAFAMILY, SOCK_DGRAM, IPPROTO_UDP); - ret = bind(sock, info->ai_addr, info->ai_addrlen); + return info; +} - freeaddrinfo(info); +static int bind_port(struct addrinfo *info, int *sock) { + int ret; + + if (sock == NULL) + die("socket is NULL"); + *sock = socket(info->ai_family, SOCK_DGRAM, IPPROTO_UDP); + ret = bind(*sock, info->ai_addr, info->ai_addrlen); if (ret < 0) die("could not bind a UDP port!"); @@ -115,7 +349,7 @@ static int bind_port(SOCKADDR *remote) { int sport; SOCKADDR local; socklen_t local_len = sizeof(local); - getsockname(sock, (struct sockaddr*)&local, &local_len); + getsockname(*sock, (struct sockaddr*)&local, &local_len); #ifdef AF_INET6 if (local.SAFAMILY == AF_INET6) { struct sockaddr_in6 *sa6 = (struct sockaddr_in6*)&local; @@ -130,37 +364,53 @@ static int bind_port(SOCKADDR *remote) { return sport; } +int rtp_setup(SOCKADDR *remote, int *cport, int *tport) { + // we take the client's cport and tport as input and overwrite them with our own + // we only create two sockets instead of three, combining control and data + // allows for one, simpler rtp receive thread + int server_port; + struct addrinfo *info; -int rtp_setup(SOCKADDR *remote, int cport, int tport) { if (running) die("rtp_setup called with active stream!"); - debug(1, "rtp_setup: cport=%d tport=%d\n", cport, tport); - - // we do our own timing and ignore the timing port. - // an audio perfectionist may wish to learn the protocol. - memcpy(&rtp_client, remote, sizeof(rtp_client)); + memcpy(&rtp_timing, remote, sizeof(rtp_timing)); #ifdef AF_INET6 if (rtp_client.SAFAMILY == AF_INET6) { struct sockaddr_in6 *sa6 = (struct sockaddr_in6*)&rtp_client; - sa6->sin6_port = htons(cport); + sa6->sin6_port = htons(*cport); + struct sockaddr_in6 *sa6_t = (struct sockaddr_in6*)&rtp_timing; + sa6_t->sin6_port = htons(*tport); } else #endif { struct sockaddr_in *sa = (struct sockaddr_in*)&rtp_client; - sa->sin_port = htons(cport); + sa->sin_port = htons(*cport); + struct sockaddr_in *sa_t = (struct sockaddr_in*)&rtp_timing; + sa_t->sin_port = htons(*tport); } - int sport = bind_port(remote); + // since we create sockets all alike the remote's, the address length + // is equal for all + info = get_address_info(remote); + addrlen = info->ai_addrlen; + + *cport = bind_port(info, &server_sock); + server_port = *cport; + *tport = bind_port(info, &timing_sock); + freeaddrinfo(info); + debug(1, "Rtp listening on dataport %d, controlport %d. Timing port is %d.\n", server_port, *cport, *tport); - debug(1, "rtp listening on port %d\n", sport); + reset_ntp_cache(); please_shutdown = 0; pthread_create(&rtp_thread, NULL, &rtp_receiver, NULL); + pthread_create(&ntp_receive_thread, NULL, &ntp_receiver, NULL); + pthread_create(&ntp_send_thread, NULL, &ntp_sender, NULL); running = 1; - return sport; + return server_port; } void rtp_shutdown(void) { @@ -170,12 +420,19 @@ void rtp_shutdown(void) { debug(2, "shutting down RTP thread\n"); please_shutdown = 1; pthread_kill(rtp_thread, SIGUSR1); + pthread_kill(ntp_receive_thread, SIGUSR1); + pthread_kill(ntp_send_thread, SIGUSR1); void *retval; pthread_join(rtp_thread, &retval); + pthread_join(ntp_receive_thread, &retval); + pthread_join(ntp_send_thread, &retval); + close(server_sock); + close(timing_sock); running = 0; } void rtp_request_resend(seq_t first, seq_t last) { + int cc; if (!running) die("rtp_request_resend called without active stream!"); @@ -189,5 +446,8 @@ void rtp_request_resend(seq_t first, seq_t last) { *(unsigned short *)(req+4) = htons(first); // missed seqnum *(unsigned short *)(req+6) = htons(last-first+1); // count - sendto(sock, req, sizeof(req), 0, (struct sockaddr*)&rtp_client, sizeof(rtp_client)); + cc = sendto(server_sock, req, sizeof(req), 0, (struct sockaddr*)&rtp_client, addrlen); + if (cc < 0){ + die("send packet failed in rtp_request_resend. error(%d)\n", errno); + } } diff --git a/rtp.h b/rtp.h index 2cb4d3945..d9c1d1d96 100644 --- a/rtp.h +++ b/rtp.h @@ -3,8 +3,11 @@ #include -int rtp_setup(SOCKADDR *remote, int controlport, int timingport); +int rtp_setup(SOCKADDR *remote, int *controlport, int *timingport); +void rtp_record(int rtp_mode); void rtp_shutdown(void); void rtp_request_resend(seq_t first, seq_t last); +long long get_ntp_offset(); +long long tstp_us(); #endif // _RTP_H diff --git a/rtsp.c b/rtsp.c index 8db9f3f24..8490275de 100644 --- a/rtsp.c +++ b/rtsp.c @@ -407,7 +407,7 @@ static void handle_setup(rtsp_conn_info *conn, tport = atoi(p); rtsp_take_player(); - int sport = rtp_setup(&conn->remote, cport, tport); + int sport = rtp_setup(&conn->remote, &cport, &tport); if (!sport) return; @@ -416,7 +416,7 @@ static void handle_setup(rtsp_conn_info *conn, char resphdr[100]; snprintf(resphdr, sizeof(resphdr), "RTP/AVP/UDP;unicast;mode=record;server_port=%d;control_port=%d;timing_port=%d", - sport, sport, sport); + sport, cport, tport); msg_add_header(resp, "Transport", resphdr); msg_add_header(resp, "Session", "1"); From 46896cd5a04b91b91a25f3318a39646b19dfa404 Mon Sep 17 00:00:00 2001 From: Frederik Slos Date: Tue, 6 May 2014 16:45:53 +0200 Subject: [PATCH 04/19] synchronised playback --- common.h | 1 + player.c | 250 ++++++++++++++++++++++++++++++++++++++++++---------- player.h | 16 +++- rtsp.c | 70 ++++++++++++++- shairport.c | 13 +-- 5 files changed, 291 insertions(+), 59 deletions(-) diff --git a/common.h b/common.h index 811d3f9cf..014c6a300 100644 --- a/common.h +++ b/common.h @@ -27,6 +27,7 @@ typedef struct { char *mdns_name; mdns_backend *mdns; int buffer_start_fill; + int delay; int daemonise; char *cmd_start, *cmd_stop; int cmd_blocking; diff --git a/player.c b/player.c index bc569c338..ccc77a7c1 100644 --- a/player.c +++ b/player.c @@ -77,9 +77,22 @@ static pthread_mutex_t vol_mutex = PTHREAD_MUTEX_INITIALIZER; // needs to be a power of 2 because of the way BUFIDX(seqno) works #define BUFFER_FRAMES 512 #define MAX_PACKET 2048 +static int sane_buffer_size; + +//player states +#define BUFFERING 0 +#define SYNCING 1 +#define PLAYING 2 +int state; + +//buffer states +#define SIGNALLOSS 0 +#define UNSYNC 1 +#define INSYNC 2 typedef struct audio_buffer_entry { // decoded audio packets int ready; + sync_cfg sync; signed short *data; } abuf_t; static abuf_t audio_buffer[BUFFER_FRAMES]; @@ -100,6 +113,21 @@ static void ab_resync(void) { ab_buffering = 1; } +// reset the audio frames in the range to NOT ready +static void ab_reset(seq_t from, seq_t to) { + abuf_t *abuf = 0; + + while (seq_diff(from, to)) { + if (seq_diff(from, to) >= BUFFER_FRAMES) { + from = from + BUFFER_FRAMES; + } else { + abuf = audio_buffer + BUFIDX(from); + abuf->ready = 0; + from++; + } + } +} + // the sequence numbers will wrap pretty often. // this returns true if the second arg is after the first static inline int seq_order(seq_t a, seq_t b) { @@ -188,17 +216,28 @@ static void free_buffer(void) { free(audio_buffer[i].data); } -void player_put_packet(seq_t seqno, uint8_t *data, int len) { +static long us_to_frames(long long us) { + return us * sampling_rate / 1000000; +} + +static inline long long get_sync_time(long long ntp_tsp) { + long long sync_time_est; + sync_time_est = (ntp_tsp + config.delay) - (tstp_us() + get_ntp_offset() + config.output->get_delay()); + return sync_time_est; +} + +void player_put_packet(seq_t seqno, sync_cfg sync_tag, uint8_t *data, int len) { abuf_t *abuf = 0; int16_t buf_fill; pthread_mutex_lock(&ab_mutex); - if (!ab_synced) { - debug(2, "syncing to first seqno %04X\n", seqno); + if (ab_synced == SIGNALLOSS) { + debug(2, "picking up first seqno %04X\n", seqno); ab_write = seqno-1; ab_read = seqno; - ab_synced = 1; + ab_synced = UNSYNC; } + debug(3, "packet: ab_write %04X, ab_read %04X, seqno %04X\n", ab_write, ab_read, seqno); if (seq_diff(ab_write, seqno) == 1) { // expected packet abuf = audio_buffer + BUFIDX(seqno); ab_write = seqno; @@ -206,24 +245,43 @@ void player_put_packet(seq_t seqno, uint8_t *data, int len) { rtp_request_resend(ab_write+1, seqno-1); abuf = audio_buffer + BUFIDX(seqno); ab_write = seqno; - } else if (seq_order(ab_read, seqno)) { // late but not yet played + } else if (seq_order(ab_read - 1, seqno)) { // late but not yet played abuf = audio_buffer + BUFIDX(seqno); } else { // too late. - debug(1, "late packet %04X (%04X:%04X)", seqno, ab_read, ab_write); + debug(1, "late packet %04X (%04X:%04X)\n", seqno, ab_read, ab_write); } buf_fill = seq_diff(ab_read, ab_write); pthread_mutex_unlock(&ab_mutex); if (abuf) { alac_decode(abuf->data, data, len); + abuf->sync.rtp_tsp = sync_tag.rtp_tsp; + // sync packets with extension bit seem to be one audio packet off: + // if the extension bit was set, pull back the ntp time by one packet's time + if (sync_tag.sync_mode == E_NTPSYNC) { + abuf->sync.ntp_tsp = sync_tag.ntp_tsp - (long long)frame_size * 1000000LL / (long long)sampling_rate; + abuf->sync.sync_mode = NTPSYNC; + } else { + abuf->sync.ntp_tsp = sync_tag.ntp_tsp; + abuf->sync.sync_mode = sync_tag.sync_mode; + } abuf->ready = 1; } pthread_mutex_lock(&ab_mutex); - if (ab_buffering && buf_fill >= config.buffer_start_fill) { + if (ab_synced == UNSYNC && (sync_tag.sync_mode == NTPSYNC)) { + // only stop buffering when the new frame is a timestamp with good sync + long long sync_time = get_sync_time(sync_tag.ntp_tsp); + if (sync_time > (config.delay/8)) { + debug(1, "found good sync (%04X:%04X) sync: %lld\n", ab_read, ab_write, sync_time); + ab_synced = INSYNC; + } + ab_reset(ab_read, seqno); + ab_read = seqno; + } + if (ab_synced == INSYNC && ab_buffering && buf_fill >= sane_buffer_size) { debug(1, "buffering over. starting play\n"); ab_buffering = 0; - bf_est_reset(buf_fill); } pthread_mutex_unlock(&ab_mutex); } @@ -339,38 +397,41 @@ static void bf_est_update(short fill) { } // get the next frame, when available. return 0 if underrun/stream reset. -static short *buffer_get_frame(void) { +static short *buffer_get_frame(sync_cfg *sync_tag) { int16_t buf_fill; seq_t read, next; abuf_t *abuf = 0; int i; - if (ab_buffering) - return 0; + sync_tag->sync_mode = NOSYNC; pthread_mutex_lock(&ab_mutex); + if (ab_buffering) { + pthread_mutex_unlock(&ab_mutex); + return 0; + } buf_fill = seq_diff(ab_read, ab_write); - if (buf_fill < 1 || !ab_synced) { + if (buf_fill < 1) { if (buf_fill < 1) warn("underrun."); ab_buffering = 1; + ab_synced = SIGNALLOSS; + state = BUFFERING; pthread_mutex_unlock(&ab_mutex); return 0; } if (buf_fill >= BUFFER_FRAMES) { // overrunning! uh-oh. restart at a sane distance warn("overrun."); - ab_read = ab_write - config.buffer_start_fill; + ab_read = ab_write - sane_buffer_size; } read = ab_read; ab_read++; - buf_fill = seq_diff(ab_read, ab_write); - bf_est_update(buf_fill); - // check if t+16, t+32, t+64, t+128, ... (buffer_start_fill / 2) + // check if t+16, t+32, t+64, t+128, ... (sane_buffer_size / 2) // packets have arrived... last-chance resend if (!ab_buffering) { - for (i = 16; i < (config.buffer_start_fill / 2); i = (i * 2)) { + for (i = 16; i < (sane_buffer_size / 2); i = (i * 2)) { next = ab_read + i; abuf = audio_buffer + BUFIDX(next); if (!abuf->ready) { @@ -384,7 +445,11 @@ static short *buffer_get_frame(void) { debug(1, "missing frame %04X.", read); memset(curframe->data, 0, FRAME_BYTES(frame_size)); } + curframe->ready = 0; + sync_tag->rtp_tsp = curframe->sync.rtp_tsp; + sync_tag->ntp_tsp = curframe->sync.ntp_tsp; + sync_tag->sync_mode = curframe->sync.sync_mode; pthread_mutex_unlock(&ab_mutex); return curframe->data; @@ -410,12 +475,12 @@ static int stuff_buffer(double playback_rate, short *inptr, short *outptr) { }; if (stuff) { if (stuff==1) { - debug(2, "+++++++++\n"); + debug(3, "+++++++++\n"); // interpolate one sample *outptr++ = dithered_vol(((long)inptr[-2] + (long)inptr[0]) >> 1); *outptr++ = dithered_vol(((long)inptr[-1] + (long)inptr[1]) >> 1); } else if (stuff==-1) { - debug(2, "---------\n"); + debug(3, "---------\n"); inptr++; inptr++; } @@ -429,12 +494,23 @@ static int stuff_buffer(double playback_rate, short *inptr, short *outptr) { return frame_size + stuff; } -static void *player_thread_func(void *arg) { - int play_samples; +//constant first-order filter +#define ALPHA 0.945 +#define LOSS 850000.0 + +static double bf_playback_rate = 1.0; - signed short *inbuf, *outbuf, *silence; - outbuf = malloc(OUTFRAME_BYTES(frame_size)); - silence = malloc(OUTFRAME_BYTES(frame_size)); +static void *player_thread_func(void *arg) { + int play_samples = frame_size; + sync_cfg sync_tag; + long long sync_time; + double sync_time_diff = 0.0; + long sync_frames = 0; + state = BUFFERING; + + signed short *inbuf, *outbuf, *resbuf, *silence; + outbuf = resbuf = malloc(OUTFRAME_BYTES(frame_size)); + inbuf = silence = malloc(OUTFRAME_BYTES(frame_size)); memset(silence, 0, OUTFRAME_BYTES(frame_size)); #ifdef FANCY_RESAMPLING @@ -452,33 +528,93 @@ static void *player_thread_func(void *arg) { srcdat.end_of_input = 0; } #endif - + debug(1,"Player STATE: %d\n", state); while (!please_stop) { - inbuf = buffer_get_frame(); - if (!inbuf) - inbuf = silence; - -#ifdef FANCY_RESAMPLING - if (fancy_resampling) { - int i; - pthread_mutex_lock(&vol_mutex); - for (i=0; i<2*FRAME_BYTES(frame_size); i++) { - frame[i] = (float)inbuf[i] / 32768.0; - frame[i] *= volume; + switch (state) { + case BUFFERING: { + inbuf = buffer_get_frame(&sync_tag); + // as long as the buffer keeps returning NULL, we assume it is still filling up + if (inbuf) { + if (sync_tag.sync_mode != NOSYNC) { + // figure out how much silence to insert before playback starts + sync_frames = us_to_frames(get_sync_time(sync_tag.ntp_tsp)); + } else { + // what if first packet(s) is lost? + warn("Ouch! first packet has no sync...\n"); + sync_frames = us_to_frames(config.delay) - config.output->get_delay(); + } + if (sync_frames < 0) + sync_frames = 0; + debug(1, "Fill with %ld frames and %ld samples\n", sync_frames / frame_size , sync_frames % frame_size); + state = SYNCING; + debug(1,"Changing player STATE: %d\n", state); } - pthread_mutex_unlock(&vol_mutex); - srcdat.src_ratio = bf_playback_rate; - src_process(src, &srcdat); - assert(srcdat.input_frames_used == FRAME_BYTES(frame_size)); - src_float_to_short_array(outframe, outbuf, FRAME_BYTES(frame_size)*2); - play_samples = srcdat.output_frames_gen; - } else + outbuf = silence; + play_samples = frame_size; + break; + } + case SYNCING: { + if (sync_frames > 0) { + if (((sync_frames < frame_size * 50) && (sync_frames >= frame_size * 49)) && \ + (sync_tag.sync_mode != NOSYNC)) { + debug(3,"sync_frames adjusting: %d->", sync_frames); + // figure out how much silence to insert before playback starts + sync_frames = us_to_frames(get_sync_time(sync_tag.ntp_tsp)); + if (sync_frames < 0) + sync_frames = 0; + debug(3,"%d\n", sync_frames); + } + play_samples = (sync_frames >= frame_size ? frame_size : sync_frames); + outbuf = silence; + sync_frames -= play_samples; + + debug(3,"Samples to go before playback start: %d\n", sync_frames); + } else { + outbuf = resbuf; + play_samples = stuff_buffer(bf_playback_rate, inbuf, outbuf); + state = PLAYING; + debug(1,"Changing player STATE: %d\n", state); + } + break; + } + case PLAYING: { + inbuf = buffer_get_frame(&sync_tag); + if (!inbuf) + inbuf = silence; +#ifdef FANCY_RESAMPLING + if (fancy_resampling) { + int i; + pthread_mutex_lock(&vol_mutex); + for (i=0; i<2*FRAME_BYTES(frame_size); i++) { + frame[i] = (float)inbuf[i] / 32768.0; + frame[i] *= volume; + } + pthread_mutex_unlock(&vol_mutex); + srcdat.src_ratio = bf_playback_rate; + src_process(src, &srcdat); + assert(srcdat.input_frames_used == FRAME_BYTES(frame_size)); + src_float_to_short_array(outframe, outbuf, FRAME_BYTES(frame_size)*2); + play_samples = srcdat.output_frames_gen; + } else #endif + if (sync_tag.sync_mode == NTPSYNC) { + //check if we're still in sync. + sync_time = get_sync_time(sync_tag.ntp_tsp); + sync_time_diff = (ALPHA * sync_time_diff) + (1.0- ALPHA) * (double)sync_time; + bf_playback_rate = 1.0 - (sync_time_diff / LOSS); + debug(2, "Playback rate %f, sync_time %lld\n", bf_playback_rate, sync_time); + } play_samples = stuff_buffer(bf_playback_rate, inbuf, outbuf); - + break; + } + default: + break; + } config.output->play(outbuf, play_samples); } + free(resbuf); + free(silence); return 0; } @@ -495,17 +631,27 @@ void player_volume(double f) { pthread_mutex_unlock(&vol_mutex); } } -void player_flush(void) { + +unsigned long player_flush(int seqno, unsigned long rtp_tsp) { + unsigned long result = 0; pthread_mutex_lock(&ab_mutex); + abuf_t *curframe = audio_buffer + BUFIDX(ab_read); + if (curframe->ready) { + result = curframe->sync.rtp_tsp; + } + ab_resync(); + ab_write = seqno-1; + ab_read = seqno; + // a negative seqno mean the client did not supply one, so we will + // treat the first audio packet that comes along, as the first in the audio stream + ab_synced = (seqno < 0 ? SIGNALLOSS : UNSYNC); pthread_mutex_unlock(&ab_mutex); + state = BUFFERING; + return result; } int player_play(stream_cfg *stream) { - if (config.buffer_start_fill > BUFFER_FRAMES) - die("specified buffer starting fill %d > buffer size %d", - config.buffer_start_fill, BUFFER_FRAMES); - AES_set_decrypt_key(stream->aeskey, 128, &aes); aesiv = stream->aesiv; init_decoder(stream->fmtp); @@ -515,6 +661,12 @@ int player_play(stream_cfg *stream) { init_src(); #endif + sane_buffer_size = ((config.delay / 1000) * sampling_rate * 2) / (frame_size * 1000 * 3); + sane_buffer_size = (sane_buffer_size >= 10 ? sane_buffer_size : 10); + if (sane_buffer_size > BUFFER_FRAMES) + die("buffer starting fill %d > buffer size %d", sane_buffer_size, BUFFER_FRAMES); + debug(1, "buffer size set to %d\n", sane_buffer_size); + please_stop = 0; command_start(); config.output->start(sampling_rate); diff --git a/player.h b/player.h index 0d52cb458..d8b1809af 100644 --- a/player.h +++ b/player.h @@ -1,9 +1,21 @@ #ifndef _PLAYER_H #define _PLAYER_H +#include #include "audio.h" #include "metadata.h" +#define NOSYNC 0 +#define NTPSYNC 1 +#define RTPSYNC 2 +#define E_NTPSYNC 3 + +typedef struct { + long long ntp_tsp; + unsigned long rtp_tsp; + int sync_mode; +} sync_cfg; + typedef struct { uint8_t aesiv[16], aeskey[16]; int32_t fmtp[12]; @@ -24,9 +36,9 @@ void player_volume(double f); void player_metadata(); void player_cover_image(char *buf, int len, char *ext); void player_cover_clear(); -void player_flush(void); +unsigned long player_flush(int seqno, unsigned long rtp_tsp); void player_resync(void); -void player_put_packet(seq_t seqno, uint8_t *data, int len); +void player_put_packet(seq_t seqno, sync_cfg sync_tag, uint8_t *data, int len); #endif //_PLAYER_H diff --git a/rtsp.c b/rtsp.c index 8490275de..d27c30e0c 100644 --- a/rtsp.c +++ b/rtsp.c @@ -380,10 +380,39 @@ static void handle_teardown(rtsp_conn_info *conn, static void handle_flush(rtsp_conn_info *conn, rtsp_message *req, rtsp_message *resp) { + // the "RTP-Info" header tells us what the seqno and rtptime of the next + // audio packet will be, should playback resume + int seq; + unsigned long rtp_tsp; + unsigned long rtptime; + if (!rtsp_playing()) return; - player_flush(); + + char *hdr = msg_get_header(req, "RTP-Info"); + if (!hdr) + return; + char *p; + p = strstr(hdr, "seq="); + if (!p) + return; + p = strchr(p, '=') + 1; + seq = atoi(p); + p = strstr(hdr, "rtptime="); + if (!p) + return; + p = strchr(p, '=') + 1; + rtptime = strtoul(p, NULL, 0); + debug(1, "Received seq: %04X, rtptime: %lu\n", seq, rtptime); + + rtp_tsp = player_flush(seq, rtptime); + char *resphdr = malloc(32); + sprintf(resphdr, "rtptime=%lu", rtp_tsp); + debug(1, "Reporting RTP-Info: %s\n", resphdr); + msg_add_header(resp, "RTP-Info", resphdr); resp->respcode = 200; + + free(resphdr); } static void handle_setup(rtsp_conn_info *conn, @@ -424,6 +453,43 @@ static void handle_setup(rtsp_conn_info *conn, resp->respcode = 200; } +static void handle_record(rtsp_conn_info *conn, + rtsp_message *req, rtsp_message *resp) { + // most clients will add a "RTP-Info" header, so we know the first + // audio packet's seqno and rtptime. + // if there's no "RTP-Info" header, we go into a loose RTP mode + int seq = -1; + unsigned long rtptime = 0; + int rtp_mode = 0; + char *hdr = msg_get_header(req, "RTP-Info"); + if (hdr) { + char *p; + p = strstr(hdr, "seq="); + if (!p) + return; + p = strchr(p, '=') + 1; + seq = atoi(p); + p = strstr(hdr, "rtptime="); + if (!p) + return; + p = strchr(p, '=') + 1; + rtptime = strtoul(p, NULL, 0); + rtp_mode = 1; + } + debug(1, "Received seq: %04X, rtptime: %lu\n", seq, rtptime); + rtp_record(rtp_mode); + player_flush(seq, rtptime); + + // note: it is assumed we're supposed to return the delay in ms + char *resphdr = malloc(10); + sprintf(resphdr, "%d", config.delay/1000); + debug(1, "Reporting %sms delay\n", resphdr); + msg_add_header(resp, "Audio-Latency", resphdr); + resp->respcode = 200; + + free(resphdr); +} + static void handle_ignore(rtsp_conn_info *conn, rtsp_message *req, rtsp_message *resp) { resp->respcode = 200; @@ -620,7 +686,7 @@ static struct method_handler { {"SETUP", handle_setup}, {"GET_PARAMETER", handle_ignore}, {"SET_PARAMETER", handle_set_parameter}, - {"RECORD", handle_ignore}, + {"RECORD", handle_record}, {NULL, NULL} }; diff --git a/shairport.c b/shairport.c index 8a3f89d39..fc9acc195 100644 --- a/shairport.c +++ b/shairport.c @@ -93,8 +93,8 @@ void usage(char *progname) { printf(" -p, --port=PORT set RTSP listening port\n"); printf(" -a, --name=NAME set advertised name\n"); printf(" -k, --password=PW require password to stream audio\n"); - printf(" -b FILL set how full the buffer must be before audio output\n"); - printf(" starts. This value is in frames; default %d\n", config.buffer_start_fill); + printf(" -t, --delay=TIME set by how much audio is delayed.\n"); + printf(" This value is in ms; default %d\n", config.delay/1000); printf(" -d, --daemon fork (daemonise). The PID of the child process is\n"); printf(" written to stdout, unless a pidfile is used.\n"); printf(" -P, --pidfile=FILE write daemon's pid to FILE on startup.\n"); @@ -138,12 +138,13 @@ int parse_options(int argc, char **argv) { {"wait-cmd", no_argument, NULL, 'w'}, {"meta-dir", required_argument, NULL, 'M'}, {"mdns", required_argument, NULL, 'm'}, + {"delay", required_argument, NULL, 't'}, {NULL, 0, NULL, 0} }; int opt; while ((opt = getopt_long(argc, argv, - "+hdvP:l:e:p:a:k:o:b:B:E:M:wm:", + "+hdvP:l:e:p:a:k:o:t:B:E:M:wm:", long_options, NULL)) > 0) { switch (opt) { default: @@ -170,8 +171,8 @@ int parse_options(int argc, char **argv) { case 'k': config.password = optarg; break; - case 'b': - config.buffer_start_fill = atoi(optarg); + case 't': + config.delay = atoi(optarg) * 1000; break; case 'B': config.cmd_start = optarg; @@ -278,7 +279,7 @@ int main(int argc, char **argv) { memset(&config, 0, sizeof(config)); // set defaults - config.buffer_start_fill = 220; + config.delay = 2205000; //todo: check with an airport express what this should be config.port = 5002; char hostname[100]; gethostname(hostname, 100); From d6db8103d9de354abd6f78f01fba755f4301e53c Mon Sep 17 00:00:00 2001 From: Frederik Slos Date: Sat, 15 Feb 2014 20:30:32 +0100 Subject: [PATCH 05/19] add support for generic sinks --- audio.c | 36 ++++++++++++++++++++++++++++++++++++ audio.h | 2 ++ player.c | 6 ++++++ 3 files changed, 44 insertions(+) diff --git a/audio.c b/audio.c index 95bedc2c1..0ec8c22f3 100644 --- a/audio.c +++ b/audio.c @@ -28,6 +28,9 @@ #include #include "audio.h" #include "config.h" +#include "common.h" +#include "player.h" +#include "rtp.h" #ifdef CONFIG_SNDIO extern audio_output audio_sndio; @@ -61,6 +64,7 @@ static audio_output *outputs[] = { NULL }; +long long audio_delay; audio_output *audio_get_output(char *name) { audio_output **out; @@ -89,3 +93,35 @@ void audio_ls_outputs(void) { (*out)->help(); } } + +//gets called for generic outputs +long long audio_get_delay(void) { + return audio_delay; +} + +void audio_estimate_delay(audio_output *output) { + signed short *silence; + int frame_size = 200; + long long base_time, cur_time, last_time, frame_time, frame_time_limit; + + silence = malloc(4 * frame_size); + memset(silence, 0, 4 * frame_size); + frame_time = (frame_size * 1000000) / 44100; + frame_time_limit = frame_time * 2 / 3; + + base_time = tstp_us(); + last_time = base_time; + int loop = 0; + while (loop < 1000) { + output->play(silence, frame_size); + cur_time = tstp_us(); + if ((cur_time - last_time) > frame_time_limit) + break; + last_time = cur_time; + loop++; + } + debug(3, "totaltime %lld, last loop time %lld, loop %d\n", last_time-base_time,cur_time - last_time, loop); + audio_delay = (loop * frame_time) - (last_time - base_time); + debug(2,"Generic output delay %lld\n", audio_delay); + free(silence); +} diff --git a/audio.h b/audio.h index 1edbd0be1..729fe6819 100644 --- a/audio.h +++ b/audio.h @@ -24,5 +24,7 @@ typedef struct { audio_output *audio_get_output(char *name); void audio_ls_outputs(void); +long long audio_get_delay(void); +void audio_estimate_delay(audio_output *output); #endif //_AUDIO_H diff --git a/player.c b/player.c index ccc77a7c1..75b4fe1b5 100644 --- a/player.c +++ b/player.c @@ -670,6 +670,12 @@ int player_play(stream_cfg *stream) { please_stop = 0; command_start(); config.output->start(sampling_rate); + // generic outputs cannot report the delay, so we estimate the buffer depth + // at startup and hope for the best + if (!config.output->get_delay) { + config.output->get_delay = audio_get_delay; + audio_estimate_delay(config.output); + } pthread_create(&player_thread, NULL, player_thread_func, NULL); return 0; From 0dc98bed544693b6318056b8a9268be99778b605 Mon Sep 17 00:00:00 2001 From: Frederik Slos Date: Thu, 19 Jun 2014 17:51:51 +0200 Subject: [PATCH 06/19] remove unused functions --- common.h | 1 - player.c | 91 -------------------------------------------------------- 2 files changed, 92 deletions(-) diff --git a/common.h b/common.h index 014c6a300..4519df569 100644 --- a/common.h +++ b/common.h @@ -26,7 +26,6 @@ typedef struct { audio_output *output; char *mdns_name; mdns_backend *mdns; - int buffer_start_fill; int delay; int daemonise; char *cmd_start, *cmd_stop; diff --git a/player.c b/player.c index 75b4fe1b5..0525b7612 100644 --- a/player.c +++ b/player.c @@ -103,8 +103,6 @@ static seq_t ab_read, ab_write; static int ab_buffering = 1, ab_synced = 0; static pthread_mutex_t ab_mutex = PTHREAD_MUTEX_INITIALIZER; -static void bf_est_reset(short fill); - static void ab_resync(void) { int i; for (i=0; i>16; } -typedef struct { - double hist[2]; - double a[2]; - double b[3]; -} biquad_t; - -static void biquad_init(biquad_t *bq, double a[], double b[]) { - bq->hist[0] = bq->hist[1] = 0.0; - memcpy(bq->a, a, 2*sizeof(double)); - memcpy(bq->b, b, 3*sizeof(double)); -} - -static void biquad_lpf(biquad_t *bq, double freq, double Q) { - double w0 = 2.0 * M_PI * freq * frame_size / (double)sampling_rate; - double alpha = sin(w0)/(2.0*Q); - - double a_0 = 1.0 + alpha; - double b[3], a[2]; - b[0] = (1.0-cos(w0))/(2.0*a_0); - b[1] = (1.0-cos(w0))/a_0; - b[2] = b[0]; - a[0] = -2.0*cos(w0)/a_0; - a[1] = (1-alpha)/a_0; - - biquad_init(bq, a, b); -} - -static double biquad_filt(biquad_t *bq, double in) { - double w = in - bq->a[0]*bq->hist[0] - bq->a[1]*bq->hist[1]; - double out = bq->b[1]*bq->hist[0] + bq->b[2]*bq->hist[1] + bq->b[0]*w; - bq->hist[1] = bq->hist[0]; - bq->hist[0] = w; - - return out; -} - -static double bf_playback_rate = 1.0; - -static double bf_est_drift = 0.0; // local clock is slower by -static biquad_t bf_drift_lpf; -static double bf_est_err = 0.0, bf_last_err; -static biquad_t bf_err_lpf, bf_err_deriv_lpf; -static double desired_fill; -static int fill_count; - -static void bf_est_reset(short fill) { - biquad_lpf(&bf_drift_lpf, 1.0/180.0, 0.3); - biquad_lpf(&bf_err_lpf, 1.0/10.0, 0.25); - biquad_lpf(&bf_err_deriv_lpf, 1.0/2.0, 0.2); - fill_count = 0; - bf_playback_rate = 1.0; - bf_est_err = bf_last_err = 0; - desired_fill = fill_count = 0; -} - -static void bf_est_update(short fill) { - // the rate-matching system needs to decide how full to keep the buffer. - // the initial fill is present when the system starts to output samples, - // but most output chains will instantly gobble their own buffer's worth of - // data. we average for a while to decide where to draw the line. - if (fill_count < 1000) { - desired_fill += (double)fill/1000.0; - fill_count++; - return; - } else if (fill_count == 1000) { - // this information could be used to help estimate our effective latency? - debug(1, "established desired fill of %f frames, " - "so output chain buffered about %f frames\n", desired_fill, - config.buffer_start_fill - desired_fill); - fill_count++; - } - -#define CONTROL_A (1e-4) -#define CONTROL_B (1e-1) - - double buf_delta = fill - desired_fill; - bf_est_err = biquad_filt(&bf_err_lpf, buf_delta); - double err_deriv = biquad_filt(&bf_err_deriv_lpf, bf_est_err - bf_last_err); - double adj_error = CONTROL_A * bf_est_err; - - bf_est_drift = biquad_filt(&bf_drift_lpf, CONTROL_B*(adj_error + err_deriv) + bf_est_drift); - - debug(3, "bf %d err %f drift %f desiring %f ed %f estd %f\n", - fill, bf_est_err, bf_est_drift, desired_fill, err_deriv, err_deriv + adj_error); - bf_playback_rate = 1.0 + adj_error + bf_est_drift; - - bf_last_err = bf_est_err; -} - // get the next frame, when available. return 0 if underrun/stream reset. static short *buffer_get_frame(sync_cfg *sync_tag) { int16_t buf_fill; From ae8a31243d3e1e816b7a6f78317329783d4de7ef Mon Sep 17 00:00:00 2001 From: Frederik Slos Date: Fri, 20 Jun 2014 18:34:33 +0200 Subject: [PATCH 07/19] fix: First check if the tools configure needs are installed --- configure | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/configure b/configure index eca59373d..f535aaa19 100755 --- a/configure +++ b/configure @@ -89,6 +89,21 @@ FUNC_EOF rm -f function_test.* } +# Check if command is installed +check_command() +{ +if command -v $1 > /dev/null 2>&1; then + echo $1 'is installed' +else + echo $1 'is missing, cannot continue' + exit 1 +fi +} + +# First check if the tools configure needs are installed +check_command pkg-config +check_command ${CC} + check_time_function do_pkg_config OpenSSL openssl From 646ab09c489cd351cdee5b568d31fe996b7876f3 Mon Sep 17 00:00:00 2001 From: Frederik Slos Date: Fri, 20 Jun 2014 18:35:19 +0200 Subject: [PATCH 08/19] fix: gcc might not be the default c compiler --- configure | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/configure b/configure index f535aaa19..381ae6fa2 100755 --- a/configure +++ b/configure @@ -1,6 +1,6 @@ #!/bin/sh -[ -z "${CC}" ] && CC=gcc +[ -z "${CC}" ] && CC=cc echo Configuring Shairport From 53548e16a614a94366b9ea04f40644d19f947da9 Mon Sep 17 00:00:00 2001 From: Frederik Slos Date: Fri, 20 Jun 2014 18:37:55 +0200 Subject: [PATCH 09/19] fix: on a FreeBSD machine, assume OpenSSL is there On FreeBSD, OpenSSL is always there, but no .pc file is installed, so pkg-config does not report it --- configure | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/configure b/configure index 381ae6fa2..56be4a86c 100755 --- a/configure +++ b/configure @@ -106,7 +106,14 @@ check_command ${CC} check_time_function -do_pkg_config OpenSSL openssl +# On FreeBSD, OpenSSL is always there, but no .pc file is installed, +# so pkg-config does not report it +if [ `uname` = 'FreeBSD' ]; then + echo "FreeBSD machine, assuming OpenSSL is there" + LDFLAGS="${LDFLAGS} -lssl -lcrypto" +else + do_pkg_config OpenSSL openssl +fi do_pkg_config libao ao CONFIG_AO do_pkg_config PulseAudio libpulse-simple CONFIG_PULSE do_pkg_config ALSA alsa CONFIG_ALSA From 6ff1253cfc675d7b96cb8b10b3cbd27c1ae722e4 Mon Sep 17 00:00:00 2001 From: Frederik Slos Date: Tue, 3 Jun 2014 19:29:15 +0200 Subject: [PATCH 10/19] fix: only sleep if delay is positive --- audio_dummy.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/audio_dummy.c b/audio_dummy.c index 24007d53c..296a08044 100644 --- a/audio_dummy.c +++ b/audio_dummy.c @@ -59,9 +59,9 @@ static void play(short buf[], int samples) { samples_played += samples; - long long finishtime = starttime + samples_played * 1e6 / Fs; - - usleep(finishtime - nowtime); + long long sleeptime = starttime + samples_played * 1e6 / Fs - nowtime; + if (sleeptime > 0) + usleep(sleeptime ); } static void stop(void) { From 62bf8ad7953fc819f2fe60710e870800d7240517 Mon Sep 17 00:00:00 2001 From: Frederik Slos Date: Tue, 3 Jun 2014 19:31:14 +0200 Subject: [PATCH 11/19] fix: first shutdown player, otherwise this hangs shairport on OpenBSD --- rtsp.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rtsp.c b/rtsp.c index d27c30e0c..fd68a1f63 100644 --- a/rtsp.c +++ b/rtsp.c @@ -886,8 +886,8 @@ static void *rtsp_conversation_thread_func(void *pconn) { if (conn->fd > 0) close(conn->fd); if (rtsp_playing()) { - rtp_shutdown(); player_stop(); + rtp_shutdown(); please_shutdown = 0; pthread_mutex_unlock(&playing_mutex); } From 788107e06cc2cf57fb788f46e343b77cfd2f1259 Mon Sep 17 00:00:00 2001 From: Frederik Slos Date: Sat, 28 Jun 2014 17:26:58 +0200 Subject: [PATCH 12/19] fix: lower base sample for interpolated sample could be out-of-bounds --- player.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/player.c b/player.c index 0525b7612..9c8ad955f 100644 --- a/player.c +++ b/player.c @@ -374,7 +374,7 @@ static int stuff_buffer(double playback_rate, short *inptr, short *outptr) { if (rand() < p_stuff * RAND_MAX) { stuff = playback_rate > 1.0 ? -1 : 1; - stuffsamp = rand() % (frame_size - 1); + stuffsamp = 1 + (rand() % (frame_size - 2)); } pthread_mutex_lock(&vol_mutex); From 021d3590c37ada7d44ff4fae05f2bd4b6e95d83a Mon Sep 17 00:00:00 2001 From: Frederik Slos Date: Wed, 2 Jul 2014 17:30:43 +0200 Subject: [PATCH 13/19] Alsa: limit buffer size, so shairport can start in time increase period size 64 to as big as HW supports: a small period only makes sense for extremely low latency applications. due to shairport's internal 352 buffer size it makes even less sense --- audio_alsa.c | 52 +++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 47 insertions(+), 5 deletions(-) diff --git a/audio_alsa.c b/audio_alsa.c index 65cc63be6..4ac67b622 100644 --- a/audio_alsa.c +++ b/audio_alsa.c @@ -160,7 +160,7 @@ static void start(int sample_rate) { device_sample_rate = sample_rate; int ret, dir = 0; - snd_pcm_uframes_t frames = 64; + snd_pcm_uframes_t period_size, buffer_size; ret = snd_pcm_open(&alsa_handle, alsa_out_dev, SND_PCM_STREAM_PLAYBACK, 0); if (ret < 0) die("Alsa initialization failed: unable to open pcm device: %s\n", snd_strerror(ret)); @@ -171,10 +171,52 @@ static void start(int sample_rate) { snd_pcm_hw_params_set_format(alsa_handle, alsa_params, SND_PCM_FORMAT_S16); snd_pcm_hw_params_set_channels(alsa_handle, alsa_params, 2); snd_pcm_hw_params_set_rate_near(alsa_handle, alsa_params, (unsigned int *)&sample_rate, &dir); - snd_pcm_hw_params_set_period_size_near(alsa_handle, alsa_params, &frames, &dir); - ret = snd_pcm_hw_params(alsa_handle, alsa_params); - if (ret < 0) - die("unable to set hw parameters: %s\n", snd_strerror(ret)); + + // setting period and buffer is a simplified version of what XBMC does + snd_pcm_hw_params_get_period_size_max(alsa_params, &period_size, NULL); + snd_pcm_hw_params_get_buffer_size_max(alsa_params, &buffer_size); + debug(1, "Hardware supports period_size_max: %d, buffer_size_max: %d\n", period_size, buffer_size); + + // we want about 333 ms of buffer, and 50ms period + // buffer might still need some tweaking to get reliable operation on RPi + USB DAC... + // make sure we do not exceed what HW supports + period_size = (period_size < sample_rate / 20 ? period_size : sample_rate / 20); + buffer_size = (buffer_size < sample_rate / 3 ? buffer_size : sample_rate / 3); + + // make sure buffer size is at least 4 times period size + period_size = (period_size < buffer_size / 4 ? period_size : buffer_size / 4); + debug(1, "Trying to set period_size: %d, buffer_size: %d\n", period_size, buffer_size); + + // we keep the originals and try setting period and buffer using copies + snd_pcm_uframes_t period_temp, buffer_temp; + period_temp = period_size; + buffer_temp = buffer_size; + snd_pcm_hw_params_t *alsa_params_copy; + snd_pcm_hw_params_alloca(&alsa_params_copy); + snd_pcm_hw_params_copy(alsa_params_copy, alsa_params); + + // some HW seems to be picky about the order period and buffer are set, so try both ways + // first try with buffer_size, period_size + if (snd_pcm_hw_params_set_buffer_size_near(alsa_handle, alsa_params_copy, &buffer_temp) != 0 + || snd_pcm_hw_params_set_period_size_near(alsa_handle, alsa_params_copy, &period_temp, NULL) != 0 + || snd_pcm_hw_params(alsa_handle, alsa_params_copy) != 0) { + period_temp = period_size; + buffer_temp = buffer_size; + snd_pcm_hw_params_copy(alsa_params_copy, alsa_params); + // retry with period_size, buffer_size + if (snd_pcm_hw_params_set_period_size_near(alsa_handle, alsa_params_copy, &period_temp, NULL) != 0 + || snd_pcm_hw_params_set_buffer_size_near(alsa_handle, alsa_params_copy, &buffer_temp) != 0 + || snd_pcm_hw_params(alsa_handle, alsa_params_copy) != 0) { + // set what alsa would have + warn("Setting period and buffer failed, going with the defaults\n"); + ret = snd_pcm_hw_params(alsa_handle, alsa_params); + if (ret < 0) + die("unable to set hw parameters: %s\n", snd_strerror(ret)); + // using alsa defaults, so see what they are + snd_pcm_get_params(alsa_handle, &buffer_size, &period_size); + debug(1, "Defaults are period_size: %d, buffer_size: %d\n", period_size, buffer_size); + } + } } static void play(short buf[], int samples) { From f377a617bf69b93f2b1db725ca81d049207b34fc Mon Sep 17 00:00:00 2001 From: Frederik Slos Date: Wed, 6 Aug 2014 22:33:16 +0200 Subject: [PATCH 14/19] rework ntp offset calculation --- rtp.c | 160 ++++++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 128 insertions(+), 32 deletions(-) diff --git a/rtp.c b/rtp.c index a9e8bd1a0..f0b4b3610 100644 --- a/rtp.c +++ b/rtp.c @@ -28,6 +28,7 @@ #include #include #include +#include #include #include #include @@ -41,7 +42,18 @@ #include #endif -#define NTPCACHESIZE 7 +// remote clock related stuff +#define MAXCACHESIZE 8 + +typedef struct { + long long offset; + long long arrival_time; + long long delay; +} ntp_offset_t; +static ntp_offset_t ntp_cache[MAXCACHESIZE]; +static long long ntp_offset; +static int ntp_cache_size; +static float drift_est; // only one RTP session can be active at a time. static int running = 0; @@ -55,7 +67,6 @@ static int timing_sock; static pthread_t rtp_thread; static pthread_t ntp_receive_thread; static pthread_t ntp_send_thread; -long long ntp_cache[NTPCACHESIZE + 1]; static int strict_rtp; void rtp_record(int rtp_mode){ @@ -81,52 +92,132 @@ static void get_current_time(struct timespec *tsp) { } static void reset_ntp_cache() { - int i; - for (i = 0; i < NTPCACHESIZE; i++) { - ntp_cache[i] = LLONG_MIN; - } - ntp_cache[NTPCACHESIZE] = 0; + ntp_offset = 0; + ntp_cache_size = 0; + drift_est = 0.0; } long long get_ntp_offset() { - return ntp_cache[NTPCACHESIZE]; + return ntp_offset; } -static void update_ntp_cache(long long offset, long long arrival_time) { - // average the offsets, filter out outliers - int i, d, minindex, maxindex; - long long total; +static void line_fit(float xvalue[], float yvalue[], int number, float *a, float *b) { + int i; + float sumx=0, sumy=0, sumxy=0, sumx2=0; + float productxy[MAXCACHESIZE], square[MAXCACHESIZE]; + float denominator; + + for(i=0;i 0; i--) { + //debug(1, "from %d to %d\n", i-1, i); + ntp_cache[i].offset = ntp_cache[i-1].offset; + ntp_cache[i].arrival_time = ntp_cache[i-1].arrival_time; + ntp_cache[i].delay = ntp_cache[i-1].delay; + } - for (i = 0; i < (NTPCACHESIZE - 1); i++) { - ntp_cache[i] = ntp_cache[i+1]; + // shift in current sample + ntp_cache[0].offset = offset; + ntp_cache[0].arrival_time = arrival_time; + ntp_cache[0].delay = delay; + + if (ntp_cache_size < MAXCACHESIZE) + ntp_cache_size++; + + // ntp's loop filter prefers time stamps with low round trip delay, we do too + // time stamps with a high round trip delay probably got 'stuck' somewhere in the loop + // and are thus not reliable + // some semantics are applied to eliminate rogue time stamps: + // minimum time stamp + 33% + 3000ms + // every time stamp with a delay below this limit is considered valid + mindelayindex = 0; + long long mindelay = delay; + for (i = 1; i < ntp_cache_size; i++) { + mindelayindex = (ntp_cache[i].delay < mindelay ? i : mindelayindex); } - ntp_cache[NTPCACHESIZE - 1] = offset; + mindelay = ntp_cache[mindelayindex].delay; + debug(3, "ntp: low: %lld lowest delay: %lld at index %d \n", ntp_cache[mindelayindex].offset, mindelay, mindelayindex); + delaylimit = (ntp_cache[mindelayindex].delay / 3) * 4 + 3000LL; + debug(2, "mindelay: %lld, delaylimit: %lld\n", mindelay, delaylimit); + // estimate drift using line fit d = 0; - minindex = 0; - maxindex = 0; - for (i = 0; i < NTPCACHESIZE; i++) { - if (ntp_cache[i] != LLONG_MIN) { + a = b = 0.0; + for (i = 0; i < ntp_cache_size; i++) { + if (ntp_cache[i].delay < delaylimit) { + yvalue[d] = (float)(ntp_cache[i].offset - ntp_cache[mindelayindex].offset); + xvalue[d] = (float)(ntp_cache[i].arrival_time - arrival_time); d++; - minindex = (ntp_cache[i] < ntp_cache[minindex] ? i : minindex); - maxindex = (ntp_cache[i] > ntp_cache[maxindex] ? i : maxindex); } + debug(3, "ntp[%d]: %lld, delay: %lld, d: %d\n", i, ntp_cache[i].offset, ntp_cache[i].delay, d); + } + if (d > 4) { + line_fit(xvalue, yvalue, d, &a, &b); + + float yerror=0.0; + for(i = 0; i < d; i++) { + yerror = fmax(yerror, fabs(yvalue[i] - (b * xvalue[i] + a))); + } + debug(2, "max yerror: %0.1f\n", yerror); + if (yerror < 1000.0) + drift_est = drift_est * 0.8 + b * 0.2; + else + debug(2, "Not updating drift estimate: yerror too big: %0.1f\n", yerror); } - debug(2, "ntp: valid entries: %d\n", d); - if (d < 5) - minindex = maxindex = -1; + debug(2, "drift_est: %f, b: %f\n", drift_est, b); + + // 'good' delay avg + // take complete cache + totaloffset = 0; + totaldelay = 0; d = 0; - total = 0; - for (i = 0; i < NTPCACHESIZE; i++) { - debug(3, "ntp[%d]: %lld, d: %d\n", i, ntp_cache[i] , d); - if ((ntp_cache[i] != LLONG_MIN) && (i != minindex) && (i != maxindex)) { + for (i = 0; i < ntp_cache_size; i++) { + if (ntp_cache[i].delay < delaylimit) { + totaloffset += ntp_cache[i].offset - ntp_cache[mindelayindex].offset; + totaldelay += arrival_time - ntp_cache[i].arrival_time; d++; - total += ntp_cache[i]; } + debug(3, "ntp[%d]: %lld, delay: %lld, d: %d, total: %lld\n", i, ntp_cache[i].offset, ntp_cache[i].delay, d, totaloffset); } - ntp_cache[NTPCACHESIZE] = total / d; - debug(2, "ntp: offset: %lld, d: %d\n", ntp_cache[NTPCACHESIZE], d); + + totaldelay /= d; + ntp_offset = (totaloffset / d) + ntp_cache[mindelayindex].offset; + debug(3, "ntp: offset: %lld, d: %d, delay %lld\n", ntp_offset, d, totaldelay); + totaldelay = (long long)(drift_est * totaldelay); + ntp_offset += totaldelay; + debug(2,"ntp: mean: %lld, bump: %lld\n", ntp_offset, totaldelay); } static long long tspk_to_us(struct timespec tspk) { @@ -262,6 +353,7 @@ static void *ntp_receiver(void *arg) { long long ntp_loc_tsp = tspk_to_us(tv); debug(2, "Timing packet ntp_loc_tsp %lld\n", ntp_loc_tsp); + // from the ntp spec: // d = (t4 - t1) - (t3 - t2) and c = (t2 - t1 + t3 - t4)/2 long long d = (ntp_loc_tsp - ntp_ref_tsp) - (ntp_sen_tsp - ntp_rec_tsp); @@ -269,7 +361,11 @@ static void *ntp_receiver(void *arg) { debug(2, "Round-trip delay %lld us\n", d); debug(2, "Clock offset %lld us\n", c); - update_ntp_cache(c, ntp_loc_tsp); + // making sure the other side's clock is sane + if ((ntp_rec_tsp <= ntp_sen_tsp) && (d >= 0)) + update_ntp_cache(c, ntp_loc_tsp, d); + else + debug(1, "Remote clock is acting weird\n"); continue; } From 007988f02a90b08552f96cbc2796856bb6fcde5d Mon Sep 17 00:00:00 2001 From: Frederik Slos Date: Wed, 6 Aug 2014 22:34:19 +0200 Subject: [PATCH 15/19] rework playback rate calculation --- player.c | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/player.c b/player.c index 9c8ad955f..a2da32329 100644 --- a/player.c +++ b/player.c @@ -403,16 +403,20 @@ static int stuff_buffer(double playback_rate, short *inptr, short *outptr) { return frame_size + stuff; } -//constant first-order filter -#define ALPHA 0.945 -#define LOSS 850000.0 +// constant first-order filter +#define ALPHA 0.05 +#define LOSS 900000.0 + +#define SYNCS_PER_S 5 static double bf_playback_rate = 1.0; static void *player_thread_func(void *arg) { int play_samples = frame_size; + int buf_count = -1; + int sync_count; sync_cfg sync_tag; - long long sync_time; + long long sync_time, last_ntp_tsp=0; double sync_time_diff = 0.0; long sync_frames = 0; state = BUFFERING; @@ -422,6 +426,9 @@ static void *player_thread_func(void *arg) { inbuf = silence = malloc(OUTFRAME_BYTES(frame_size)); memset(silence, 0, OUTFRAME_BYTES(frame_size)); + sync_count = 1 + (sampling_rate / (frame_size * SYNCS_PER_S)); + debug(2, "sync_count: %d\n", sync_count); + #ifdef FANCY_RESAMPLING float *frame, *outframe; SRC_DATA srcdat; @@ -506,10 +513,16 @@ static void *player_thread_func(void *arg) { play_samples = srcdat.output_frames_gen; } else #endif + if (buf_count > -1) + buf_count++; if (sync_tag.sync_mode == NTPSYNC) { + buf_count = 0; + last_ntp_tsp = sync_tag.ntp_tsp; + } + if (buf_count % sync_count == 0) { //check if we're still in sync. - sync_time = get_sync_time(sync_tag.ntp_tsp); - sync_time_diff = (ALPHA * sync_time_diff) + (1.0- ALPHA) * (double)sync_time; + sync_time = get_sync_time(last_ntp_tsp + (long long)buf_count * (long long)frame_size * 1000000LL / (long long)sampling_rate); + sync_time_diff = (ALPHA * (double)sync_time) + (1.0 - ALPHA) * sync_time_diff; bf_playback_rate = 1.0 - (sync_time_diff / LOSS); debug(2, "Playback rate %f, sync_time %lld\n", bf_playback_rate, sync_time); } From ef3ac89d08d65050d0c0157fcfab2af0c6f5dd12 Mon Sep 17 00:00:00 2001 From: Frederik Slos Date: Tue, 2 Sep 2014 21:43:18 +0200 Subject: [PATCH 16/19] simplify and improve ntp calculation --- rtp.c | 134 +++++++++++++++++++++++----------------------------------- 1 file changed, 54 insertions(+), 80 deletions(-) diff --git a/rtp.c b/rtp.c index f0b4b3610..a7b737313 100644 --- a/rtp.c +++ b/rtp.c @@ -51,9 +51,10 @@ typedef struct { long long delay; } ntp_offset_t; static ntp_offset_t ntp_cache[MAXCACHESIZE]; -static long long ntp_offset; -static int ntp_cache_size; -static float drift_est; +static long long ntp_offset, ntp_offset_error; +static int ntp_cache_size, drift_est_size; +static float drift_est, drift_est_lp; +static long long last_update_time; // only one RTP session can be active at a time. static int running = 0; @@ -93,51 +94,21 @@ static void get_current_time(struct timespec *tsp) { static void reset_ntp_cache() { ntp_offset = 0; + ntp_offset_error = 0; ntp_cache_size = 0; + drift_est_size = 0; drift_est = 0.0; + last_update_time = 0; + drift_est_lp = 0.0; } long long get_ntp_offset() { return ntp_offset; } - -static void line_fit(float xvalue[], float yvalue[], int number, float *a, float *b) { - int i; - float sumx=0, sumy=0, sumxy=0, sumx2=0; - float productxy[MAXCACHESIZE], square[MAXCACHESIZE]; - float denominator; - - for(i=0;i= delaylimit) + i++; + debug(2, "ntp: raw: %lld, local: %lld\n", ntp_cache[i].offset, arrival_time); + + // predict offset + // if the time since the last timestamp is short, we are still starting up: just average + double time_elapsed; + long long last_offset = ntp_offset; + time_elapsed = (double)(arrival_time - last_update_time); + if ((time_elapsed < 1000000.0) || (ntp_cache_size <= 3)) { + totaloffset = 0; + d = 0; + for (i = 0; i < ntp_cache_size; i++) { + if (ntp_cache[i].delay < delaylimit) { + totaloffset += ntp_cache[i].offset - ntp_cache[mindelayindex].offset; + d++; + } + debug(2, "ntp[%d]: %lld, d: %d, total: %lld\n", i, ntp_cache[i].offset, d, totaloffset); } - debug(3, "ntp[%d]: %lld, delay: %lld, d: %d\n", i, ntp_cache[i].offset, ntp_cache[i].delay, d); - } - if (d > 4) { - line_fit(xvalue, yvalue, d, &a, &b); - - float yerror=0.0; - for(i = 0; i < d; i++) { - yerror = fmax(yerror, fabs(yvalue[i] - (b * xvalue[i] + a))); + ntp_offset = (totaloffset / d) + ntp_cache[mindelayindex].offset; + } else { + // update the drift estimate + if (delay < delaylimit) { + double alpha; + i = ntp_cache_size; + // find oldest good time stamp + while (ntp_cache[i].delay >= delaylimit) + i--; + if (drift_est_size < 16) + drift_est_size++; + // progressively stiffen the lpf + alpha = 1.0 / drift_est_size; + drift_est_lp = ((double)(offset - last_offset) / time_elapsed) * alpha + drift_est_lp * (1 - alpha); } - debug(2, "max yerror: %0.1f\n", yerror); - if (yerror < 1000.0) - drift_est = drift_est * 0.8 + b * 0.2; - else - debug(2, "Not updating drift estimate: yerror too big: %0.1f\n", yerror); - } - debug(2, "drift_est: %f, b: %f\n", drift_est, b); - - // 'good' delay avg - // take complete cache - totaloffset = 0; - totaldelay = 0; - d = 0; - for (i = 0; i < ntp_cache_size; i++) { - if (ntp_cache[i].delay < delaylimit) { - totaloffset += ntp_cache[i].offset - ntp_cache[mindelayindex].offset; - totaldelay += arrival_time - ntp_cache[i].arrival_time; - d++; + + // estimate the next timestamp + ntp_offset += (long long)(time_elapsed * drift_est_lp); + + // now calculate error integral and compensate for long-term drift + if (delay < delaylimit) { + ntp_offset_error += offset - ntp_offset; } - debug(3, "ntp[%d]: %lld, delay: %lld, d: %d, total: %lld\n", i, ntp_cache[i].offset, ntp_cache[i].delay, d, totaloffset); + ntp_offset += ntp_offset_error / 128; + debug(2, "est bump: %lld, %lld\n", (long long)(time_elapsed * drift_est_lp), ntp_offset - last_offset); } - - totaldelay /= d; - ntp_offset = (totaloffset / d) + ntp_cache[mindelayindex].offset; - debug(3, "ntp: offset: %lld, d: %d, delay %lld\n", ntp_offset, d, totaldelay); - totaldelay = (long long)(drift_est * totaldelay); - ntp_offset += totaldelay; - debug(2,"ntp: mean: %lld, bump: %lld\n", ntp_offset, totaldelay); + last_update_time = arrival_time; + debug(2,"ntp: pre: %lld, error: %lld, drift_est_lp: %f, %lld\n", ntp_offset, ntp_offset_error, drift_est_lp, ((double)(offset - last_offset) / time_elapsed)); } static long long tspk_to_us(struct timespec tspk) { From fa282da558bebeb94641b6b761b5eed9129fb140 Mon Sep 17 00:00:00 2001 From: Frederik Slos Date: Sun, 7 Sep 2014 15:24:34 +0200 Subject: [PATCH 17/19] interpolate remote time --- player.c | 6 ------ rtp.c | 24 ++++++++++++++++-------- rtp.h | 2 +- 3 files changed, 17 insertions(+), 15 deletions(-) diff --git a/player.c b/player.c index a2da32329..cad5abddd 100644 --- a/player.c +++ b/player.c @@ -218,12 +218,6 @@ static long us_to_frames(long long us) { return us * sampling_rate / 1000000; } -static inline long long get_sync_time(long long ntp_tsp) { - long long sync_time_est; - sync_time_est = (ntp_tsp + config.delay) - (tstp_us() + get_ntp_offset() + config.output->get_delay()); - return sync_time_est; -} - void player_put_packet(seq_t seqno, sync_cfg sync_tag, uint8_t *data, int len) { abuf_t *abuf = 0; int16_t buf_fill; diff --git a/rtp.c b/rtp.c index a7b737313..c1373129d 100644 --- a/rtp.c +++ b/rtp.c @@ -55,6 +55,7 @@ static long long ntp_offset, ntp_offset_error; static int ntp_cache_size, drift_est_size; static float drift_est, drift_est_lp; static long long last_update_time; +static pthread_mutex_t time_mutex = PTHREAD_MUTEX_INITIALIZER; // only one RTP session can be active at a time. static int running = 0; @@ -102,10 +103,6 @@ static void reset_ntp_cache() { drift_est_lp = 0.0; } -long long get_ntp_offset() { - return ntp_offset; -} - static void update_ntp_cache(long long offset, long long arrival_time, long long delay) { int i, d, mindelayindex; long long delaylimit, totaloffset; @@ -154,6 +151,7 @@ static void update_ntp_cache(long long offset, long long arrival_time, long long double time_elapsed; long long last_offset = ntp_offset; time_elapsed = (double)(arrival_time - last_update_time); + pthread_mutex_lock(&time_mutex); if ((time_elapsed < 1000000.0) || (ntp_cache_size <= 3)) { totaloffset = 0; d = 0; @@ -169,10 +167,6 @@ static void update_ntp_cache(long long offset, long long arrival_time, long long // update the drift estimate if (delay < delaylimit) { double alpha; - i = ntp_cache_size; - // find oldest good time stamp - while (ntp_cache[i].delay >= delaylimit) - i--; if (drift_est_size < 16) drift_est_size++; // progressively stiffen the lpf @@ -191,6 +185,7 @@ static void update_ntp_cache(long long offset, long long arrival_time, long long debug(2, "est bump: %lld, %lld\n", (long long)(time_elapsed * drift_est_lp), ntp_offset - last_offset); } last_update_time = arrival_time; + pthread_mutex_unlock(&time_mutex); debug(2,"ntp: pre: %lld, error: %lld, drift_est_lp: %f, %lld\n", ntp_offset, ntp_offset_error, drift_est_lp, ((double)(offset - last_offset) / time_elapsed)); } @@ -217,6 +212,19 @@ static long long ntp_tsp_to_us(uint32_t timestamp_hi, uint32_t timestamp_lo) { return timetemp; } +long long get_sync_time(long long ntp_tsp) { + // time lock should be acquired in this function? + long long sync_time_est, local_time, remote_time, delay_to_out; + pthread_mutex_lock(&time_mutex); + local_time = tstp_us(); + delay_to_out= config.output->get_delay(); + remote_time = ntp_offset + (double)(local_time - last_update_time) * drift_est_lp + local_time; + //sync_time_est = (ntp_tsp + config.delay) - (tstp_us() + get_ntp_offset() + config.output->get_delay()); + pthread_mutex_unlock(&time_mutex); + sync_time_est = (ntp_tsp + config.delay) - (remote_time + delay_to_out); + return sync_time_est; +} + static void *rtp_receiver(void *arg) { // we inherit the signal mask (SIGUSR1) uint8_t packet[2048], *pktp; diff --git a/rtp.h b/rtp.h index d9c1d1d96..ab15fc6ba 100644 --- a/rtp.h +++ b/rtp.h @@ -7,7 +7,7 @@ int rtp_setup(SOCKADDR *remote, int *controlport, int *timingport); void rtp_record(int rtp_mode); void rtp_shutdown(void); void rtp_request_resend(seq_t first, seq_t last); -long long get_ntp_offset(); +long long get_sync_time(); long long tstp_us(); #endif // _RTP_H From 2bd82a265e26dea49a82ddb88ff92b7a5b51b4ee Mon Sep 17 00:00:00 2001 From: Frederik Slos Date: Sun, 7 Sep 2014 15:57:25 +0200 Subject: [PATCH 18/19] change to PID controller for playback_rate --- player.c | 65 ++++++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 59 insertions(+), 6 deletions(-) diff --git a/player.c b/player.c index cad5abddd..8350bf4f8 100644 --- a/player.c +++ b/player.c @@ -397,9 +397,55 @@ static int stuff_buffer(double playback_rate, short *inptr, short *outptr) { return frame_size + stuff; } -// constant first-order filter -#define ALPHA 0.05 -#define LOSS 900000.0 +typedef struct { + double p_gain; + double i_gain; + double d_gain; + double y_a_lpf; + double d_a_lpf; + double y_1; + double d_1; + double i_state; + double i_max; + double pv_1; + biquad_t lpf_y; +} pid_param; + +double reg_pid(pid_param *p, double pv, double setpoint) { +// straightforward PID controller with output filtering and D term filtering + double ek, tp, ti, td, y, dd; + + // calculate the error + ek = setpoint - pv; + + // update integral state + p->i_state += ek; + if (p->i_state > p->i_max) + p->i_state = p->i_max; + if (p->i_state < -p->i_max) + p->i_state = -p->i_max; + + // lpf derivative + dd = (p->pv_1 - pv) * p->d_a_lpf + p->d_1 * (1 - p->d_a_lpf); + + // calculate the terms + tp = ek * p->p_gain; + ti = p->i_state * p->i_gain; + td = dd * p->d_gain; + y = tp + ti + td; + debug(1, "tp: %f, ti: %f, td %f\n", tp, ti, td); + + // lpf the output + //y = y * p->y_a_lpf + p->y_1 * (1 - p->y_a_lpf); + y = biquad_filt(&(p->lpf_y), y); + + // update history + //p->y_1 = y; + p->pv_1 = pv; + p->d_1 = dd; + + return y; +} #define SYNCS_PER_S 5 @@ -411,7 +457,7 @@ static void *player_thread_func(void *arg) { int sync_count; sync_cfg sync_tag; long long sync_time, last_ntp_tsp=0; - double sync_time_diff = 0.0; + pid_param pid_p; long sync_frames = 0; state = BUFFERING; @@ -423,6 +469,14 @@ static void *player_thread_func(void *arg) { sync_count = 1 + (sampling_rate / (frame_size * SYNCS_PER_S)); debug(2, "sync_count: %d\n", sync_count); + pid_p.p_gain = 30e-7; + pid_p.i_gain = pid_p.p_gain / 120.0; + pid_p.d_gain = 0.0; //pid_p.p_gain / 100.0;; + pid_p.i_max = 1e7; + pid_p.y_a_lpf = 0.25; + pid_p.d_a_lpf = 0.2; + biquad_lpf(&pid_p.lpf_y, 0.33, 1.0); + #ifdef FANCY_RESAMPLING float *frame, *outframe; SRC_DATA srcdat; @@ -516,8 +570,7 @@ static void *player_thread_func(void *arg) { if (buf_count % sync_count == 0) { //check if we're still in sync. sync_time = get_sync_time(last_ntp_tsp + (long long)buf_count * (long long)frame_size * 1000000LL / (long long)sampling_rate); - sync_time_diff = (ALPHA * (double)sync_time) + (1.0 - ALPHA) * sync_time_diff; - bf_playback_rate = 1.0 - (sync_time_diff / LOSS); + bf_playback_rate = 1.0 + reg_pid(&pid_p, sync_time, 0); debug(2, "Playback rate %f, sync_time %lld\n", bf_playback_rate, sync_time); } play_samples = stuff_buffer(bf_playback_rate, inbuf, outbuf); From 63b7911e3852cb2d83562a58a19f73b4183f476e Mon Sep 17 00:00:00 2001 From: Frederik Slos Date: Thu, 6 Nov 2014 16:57:06 +0100 Subject: [PATCH 19/19] add biquad stuff --- common.c | 31 +++++++++++++++++++++++++++++++ common.h | 10 ++++++++++ 2 files changed, 41 insertions(+) diff --git a/common.c b/common.c index c00f643cd..70b68fbfb 100644 --- a/common.c +++ b/common.c @@ -27,6 +27,7 @@ #include #include #include +#include #include #include #include @@ -202,3 +203,33 @@ void command_stop(void) { if (!config.cmd_blocking) exit(0); } + +void biquad_init(biquad_t *bq, double a[], double b[]) { + bq->hist[0] = bq->hist[1] = 0.0; + memcpy(bq->a, a, 2*sizeof(double)); + memcpy(bq->b, b, 3*sizeof(double)); +} + +void biquad_lpf(biquad_t *bq, double freq, double Q) { + double w0 = 2.0 * M_PI * freq; + double alpha = sin(w0)/(2.0*Q); + + double a_0 = 1.0 + alpha; + double b[3], a[2]; + b[0] = (1.0-cos(w0))/(2.0*a_0); + b[1] = (1.0-cos(w0))/a_0; + b[2] = b[0]; + a[0] = -2.0*cos(w0)/a_0; + a[1] = (1-alpha)/a_0; + + biquad_init(bq, a, b); +} + +double biquad_filt(biquad_t *bq, double in) { + double w = in - bq->a[0]*bq->hist[0] - bq->a[1]*bq->hist[1]; + double out = bq->b[1]*bq->hist[0] + bq->b[2]*bq->hist[1] + bq->b[0]*w; + bq->hist[1] = bq->hist[0]; + bq->hist[0] = w; + + return out; +} diff --git a/common.h b/common.h index 4519df569..175d7931d 100644 --- a/common.h +++ b/common.h @@ -63,4 +63,14 @@ extern shairport_cfg config; void shairport_shutdown(int retval); void shairport_startup_complete(void); +typedef struct { + double hist[2]; + double a[2]; + double b[3]; +} biquad_t; + +void biquad_init(biquad_t *bq, double a[], double b[]); +void biquad_lpf(biquad_t *bq, double freq, double Q); +double biquad_filt(biquad_t *bq, double in); + #endif // _COMMON_H