blob: 4c6b0aecaffd02ec2c07f630e6c05eb72698dbb6 [file] [log] [blame]
Olivier Naudan2556a702012-04-16 08:57:32 -04001/* GStreamer
2 * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
8 *
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
13 *
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
Sebastian Dröge6bf128e2013-07-14 12:07:04 +020016 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17 * Boston, MA 02110-1301, USA.
Olivier Naudan2556a702012-04-16 08:57:32 -040018 */
19#include <string.h>
20#include <stdlib.h>
21
22#include "rdtjitterbuffer.h"
23#include "gstrdtbuffer.h"
24
25GST_DEBUG_CATEGORY_STATIC (rdt_jitter_buffer_debug);
26#define GST_CAT_DEFAULT rdt_jitter_buffer_debug
27
28#define MAX_WINDOW RDT_JITTER_BUFFER_MAX_WINDOW
29#define MAX_TIME (2 * GST_SECOND)
30
31/* signals and args */
32enum
33{
34 LAST_SIGNAL
35};
36
37enum
38{
39 PROP_0
40};
41
42/* GObject vmethods */
43static void rdt_jitter_buffer_finalize (GObject * object);
44
45/* static guint rdt_jitter_buffer_signals[LAST_SIGNAL] = { 0 }; */
46
47G_DEFINE_TYPE (RDTJitterBuffer, rdt_jitter_buffer, G_TYPE_OBJECT);
48
49static void
50rdt_jitter_buffer_class_init (RDTJitterBufferClass * klass)
51{
52 GObjectClass *gobject_class;
53
54 gobject_class = (GObjectClass *) klass;
55
56 gobject_class->finalize = rdt_jitter_buffer_finalize;
57
58 GST_DEBUG_CATEGORY_INIT (rdt_jitter_buffer_debug, "rdtjitterbuffer", 0,
59 "RDT Jitter Buffer");
60}
61
62static void
63rdt_jitter_buffer_init (RDTJitterBuffer * jbuf)
64{
65 jbuf->packets = g_queue_new ();
66
67 rdt_jitter_buffer_reset_skew (jbuf);
68}
69
70static void
71rdt_jitter_buffer_finalize (GObject * object)
72{
73 RDTJitterBuffer *jbuf;
74
75 jbuf = RDT_JITTER_BUFFER_CAST (object);
76
77 rdt_jitter_buffer_flush (jbuf);
78 g_queue_free (jbuf->packets);
79
80 G_OBJECT_CLASS (rdt_jitter_buffer_parent_class)->finalize (object);
81}
82
83/**
84 * rdt_jitter_buffer_new:
85 *
86 * Create an #RDTJitterBuffer.
87 *
88 * Returns: a new #RDTJitterBuffer. Use g_object_unref() after usage.
89 */
90RDTJitterBuffer *
91rdt_jitter_buffer_new (void)
92{
93 RDTJitterBuffer *jbuf;
94
95 jbuf = g_object_new (RDT_TYPE_JITTER_BUFFER, NULL);
96
97 return jbuf;
98}
99
100void
101rdt_jitter_buffer_reset_skew (RDTJitterBuffer * jbuf)
102{
103 jbuf->base_time = -1;
104 jbuf->base_rtptime = -1;
105 jbuf->ext_rtptime = -1;
106 jbuf->window_pos = 0;
107 jbuf->window_filling = TRUE;
108 jbuf->window_min = 0;
109 jbuf->skew = 0;
110 jbuf->prev_send_diff = -1;
111}
112
113/* For the clock skew we use a windowed low point averaging algorithm as can be
114 * found in http://www.grame.fr/pub/TR-050601.pdf. The idea is that the jitter is
115 * composed of:
116 *
117 * J = N + n
118 *
119 * N : a constant network delay.
120 * n : random added noise. The noise is concentrated around 0
121 *
122 * In the receiver we can track the elapsed time at the sender with:
123 *
124 * send_diff(i) = (Tsi - Ts0);
125 *
126 * Tsi : The time at the sender at packet i
127 * Ts0 : The time at the sender at the first packet
128 *
129 * This is the difference between the RDT timestamp in the first received packet
130 * and the current packet.
131 *
132 * At the receiver we have to deal with the jitter introduced by the network.
133 *
134 * recv_diff(i) = (Tri - Tr0)
135 *
136 * Tri : The time at the receiver at packet i
137 * Tr0 : The time at the receiver at the first packet
138 *
139 * Both of these values contain a jitter Ji, a jitter for packet i, so we can
140 * write:
141 *
142 * recv_diff(i) = (Cri + D + ni) - (Cr0 + D + n0))
143 *
144 * Cri : The time of the clock at the receiver for packet i
145 * D + ni : The jitter when receiving packet i
146 *
147 * We see that the network delay is irrelevant here as we can elliminate D:
148 *
149 * recv_diff(i) = (Cri + ni) - (Cr0 + n0))
150 *
151 * The drift is now expressed as:
152 *
153 * Drift(i) = recv_diff(i) - send_diff(i);
154 *
155 * We now keep the W latest values of Drift and find the minimum (this is the
156 * one with the lowest network jitter and thus the one which is least affected
157 * by it). We average this lowest value to smooth out the resulting network skew.
158 *
159 * Both the window and the weighting used for averaging influence the accuracy
160 * of the drift estimation. Finding the correct parameters turns out to be a
161 * compromise between accuracy and inertia.
162 *
163 * We use a 2 second window or up to 512 data points, which is statistically big
164 * enough to catch spikes (FIXME, detect spikes).
165 * We also use a rather large weighting factor (125) to smoothly adapt. During
166 * startup, when filling the window, we use a parabolic weighting factor, the
167 * more the window is filled, the faster we move to the detected possible skew.
168 *
169 * Returns: @time adjusted with the clock skew.
170 */
171static GstClockTime
172calculate_skew (RDTJitterBuffer * jbuf, guint32 rtptime, GstClockTime time,
173 guint32 clock_rate)
174{
175 guint64 ext_rtptime;
176 guint64 send_diff, recv_diff;
177 gint64 delta;
178 gint64 old;
179 gint pos, i;
180 GstClockTime gstrtptime, out_time;
181
182 //ext_rtptime = gst_rtp_buffer_ext_timestamp (&jbuf->ext_rtptime, rtptime);
183 ext_rtptime = rtptime;
184
185 gstrtptime = gst_util_uint64_scale_int (ext_rtptime, GST_SECOND, clock_rate);
186
187again:
188 /* first time, lock on to time and gstrtptime */
189 if (jbuf->base_time == -1)
190 jbuf->base_time = time;
191 if (jbuf->base_rtptime == -1)
192 jbuf->base_rtptime = gstrtptime;
193
194 if (gstrtptime >= jbuf->base_rtptime)
195 send_diff = gstrtptime - jbuf->base_rtptime;
196 else {
197 /* elapsed time at sender, timestamps can go backwards and thus be smaller
198 * than our base time, take a new base time in that case. */
199 GST_DEBUG ("backward timestamps at server, taking new base time");
200 jbuf->base_rtptime = gstrtptime;
201 jbuf->base_time = time;
202 send_diff = 0;
203 }
204
205 GST_DEBUG ("extrtp %" G_GUINT64_FORMAT ", gstrtp %" GST_TIME_FORMAT ", base %"
206 GST_TIME_FORMAT ", send_diff %" GST_TIME_FORMAT, ext_rtptime,
207 GST_TIME_ARGS (gstrtptime), GST_TIME_ARGS (jbuf->base_rtptime),
208 GST_TIME_ARGS (send_diff));
209
210 if (jbuf->prev_send_diff != -1 && time != -1) {
211 gint64 delta_diff;
212
213 if (send_diff > jbuf->prev_send_diff)
214 delta_diff = send_diff - jbuf->prev_send_diff;
215 else
216 delta_diff = jbuf->prev_send_diff - send_diff;
217
218 /* server changed rtp timestamps too quickly, reset skew detection and start
219 * again. This value is sortof arbitrary and can be a bad measurement up if
220 * there are many packets missing because then we get a big gap that is
221 * unrelated to a timestamp switch. */
222 if (delta_diff > GST_SECOND) {
223 GST_DEBUG ("delta changed too quickly %" GST_TIME_FORMAT " reset skew",
224 GST_TIME_ARGS (delta_diff));
225 rdt_jitter_buffer_reset_skew (jbuf);
226 goto again;
227 }
228 }
229 jbuf->prev_send_diff = send_diff;
230
231 /* we don't have an arrival timestamp so we can't do skew detection. we
232 * should still apply a timestamp based on RDT timestamp and base_time */
233 if (time == -1)
234 goto no_skew;
235
236 /* elapsed time at receiver, includes the jitter */
237 recv_diff = time - jbuf->base_time;
238
239 GST_DEBUG ("time %" GST_TIME_FORMAT ", base %" GST_TIME_FORMAT ", recv_diff %"
240 GST_TIME_FORMAT, GST_TIME_ARGS (time), GST_TIME_ARGS (jbuf->base_time),
241 GST_TIME_ARGS (recv_diff));
242
243 /* measure the diff */
244 delta = ((gint64) recv_diff) - ((gint64) send_diff);
245
246 pos = jbuf->window_pos;
247
248 if (jbuf->window_filling) {
249 /* we are filling the window */
250 GST_DEBUG ("filling %d, delta %" G_GINT64_FORMAT, pos, delta);
251 jbuf->window[pos++] = delta;
252 /* calc the min delta we observed */
253 if (pos == 1 || delta < jbuf->window_min)
254 jbuf->window_min = delta;
255
256 if (send_diff >= MAX_TIME || pos >= MAX_WINDOW) {
257 jbuf->window_size = pos;
258
259 /* window filled */
260 GST_DEBUG ("min %" G_GINT64_FORMAT, jbuf->window_min);
261
262 /* the skew is now the min */
263 jbuf->skew = jbuf->window_min;
264 jbuf->window_filling = FALSE;
265 } else {
266 gint perc_time, perc_window, perc;
267
268 /* figure out how much we filled the window, this depends on the amount of
269 * time we have or the max number of points we keep. */
270 perc_time = send_diff * 100 / MAX_TIME;
271 perc_window = pos * 100 / MAX_WINDOW;
272 perc = MAX (perc_time, perc_window);
273
274 /* make a parabolic function, the closer we get to the MAX, the more value
275 * we give to the scaling factor of the new value */
276 perc = perc * perc;
277
278 /* quickly go to the min value when we are filling up, slowly when we are
279 * just starting because we're not sure it's a good value yet. */
280 jbuf->skew =
281 (perc * jbuf->window_min + ((10000 - perc) * jbuf->skew)) / 10000;
282 jbuf->window_size = pos + 1;
283 }
284 } else {
285 /* pick old value and store new value. We keep the previous value in order
286 * to quickly check if the min of the window changed */
287 old = jbuf->window[pos];
288 jbuf->window[pos++] = delta;
289
290 if (delta <= jbuf->window_min) {
291 /* if the new value we inserted is smaller or equal to the current min,
292 * it becomes the new min */
293 jbuf->window_min = delta;
294 } else if (old == jbuf->window_min) {
295 gint64 min = G_MAXINT64;
296
297 /* if we removed the old min, we have to find a new min */
298 for (i = 0; i < jbuf->window_size; i++) {
299 /* we found another value equal to the old min, we can stop searching now */
300 if (jbuf->window[i] == old) {
301 min = old;
302 break;
303 }
304 if (jbuf->window[i] < min)
305 min = jbuf->window[i];
306 }
307 jbuf->window_min = min;
308 }
309 /* average the min values */
310 jbuf->skew = (jbuf->window_min + (124 * jbuf->skew)) / 125;
311 GST_DEBUG ("delta %" G_GINT64_FORMAT ", new min: %" G_GINT64_FORMAT,
312 delta, jbuf->window_min);
313 }
314 /* wrap around in the window */
315 if (pos >= jbuf->window_size)
316 pos = 0;
317 jbuf->window_pos = pos;
318
319no_skew:
320 /* the output time is defined as the base timestamp plus the RDT time
321 * adjusted for the clock skew .*/
322 out_time = jbuf->base_time + send_diff + jbuf->skew;
323
324 GST_DEBUG ("skew %" G_GINT64_FORMAT ", out %" GST_TIME_FORMAT,
325 jbuf->skew, GST_TIME_ARGS (out_time));
326
327 return out_time;
328}
329
330/**
331 * rdt_jitter_buffer_insert:
332 * @jbuf: an #RDTJitterBuffer
333 * @buf: a buffer
334 * @time: a running_time when this buffer was received in nanoseconds
335 * @clock_rate: the clock-rate of the payload of @buf
336 * @tail: TRUE when the tail element changed.
337 *
338 * Inserts @buf into the packet queue of @jbuf. The sequence number of the
339 * packet will be used to sort the packets. This function takes ownerhip of
340 * @buf when the function returns %TRUE.
341 * @buf should have writable metadata when calling this function.
342 *
343 * Returns: %FALSE if a packet with the same number already existed.
344 */
345gboolean
346rdt_jitter_buffer_insert (RDTJitterBuffer * jbuf, GstBuffer * buf,
347 GstClockTime time, guint32 clock_rate, gboolean * tail)
348{
349 GList *list;
350 guint32 rtptime;
351 guint16 seqnum;
352 GstRDTPacket packet;
353 gboolean more;
354
355 g_return_val_if_fail (jbuf != NULL, FALSE);
356 g_return_val_if_fail (buf != NULL, FALSE);
357
358 more = gst_rdt_buffer_get_first_packet (buf, &packet);
359 /* programmer error */
360 g_return_val_if_fail (more == TRUE, FALSE);
361
362 seqnum = gst_rdt_packet_data_get_seq (&packet);
363 /* do skew calculation by measuring the difference between rtptime and the
364 * receive time, this function will retimestamp @buf with the skew corrected
365 * running time. */
366 rtptime = gst_rdt_packet_data_get_timestamp (&packet);
367
368 /* loop the list to skip strictly smaller seqnum buffers */
369 for (list = jbuf->packets->head; list; list = g_list_next (list)) {
370 guint16 qseq;
371 gint gap;
372
373 more =
374 gst_rdt_buffer_get_first_packet (GST_BUFFER_CAST (list->data), &packet);
375 /* programmer error */
376 g_return_val_if_fail (more == TRUE, FALSE);
377
378 qseq = gst_rdt_packet_data_get_seq (&packet);
379
380 /* compare the new seqnum to the one in the buffer */
381 gap = gst_rdt_buffer_compare_seqnum (seqnum, qseq);
382
383 /* we hit a packet with the same seqnum, notify a duplicate */
384 if (G_UNLIKELY (gap == 0))
385 goto duplicate;
386
387 /* seqnum > qseq, we can stop looking */
388 if (G_LIKELY (gap < 0))
389 break;
390 }
391
392
393 if (clock_rate) {
394 time = calculate_skew (jbuf, rtptime, time, clock_rate);
395 GST_BUFFER_TIMESTAMP (buf) = time;
396 }
397
398 if (list)
399 g_queue_insert_before (jbuf->packets, list, buf);
400 else
401 g_queue_push_tail (jbuf->packets, buf);
402
403 /* tail was changed when we did not find a previous packet, we set the return
404 * flag when requested. */
405 if (tail)
406 *tail = (list == NULL);
407
408 return TRUE;
409
410 /* ERRORS */
411duplicate:
412 {
413 GST_WARNING ("duplicate packet %d found", (gint) seqnum);
414 return FALSE;
415 }
416}
417
418/**
419 * rdt_jitter_buffer_pop:
420 * @jbuf: an #RDTJitterBuffer
421 *
422 * Pops the oldest buffer from the packet queue of @jbuf. The popped buffer will
423 * have its timestamp adjusted with the incomming running_time and the detected
424 * clock skew.
425 *
426 * Returns: a #GstBuffer or %NULL when there was no packet in the queue.
427 */
428GstBuffer *
429rdt_jitter_buffer_pop (RDTJitterBuffer * jbuf)
430{
431 GstBuffer *buf;
432
433 g_return_val_if_fail (jbuf != NULL, FALSE);
434
435 buf = g_queue_pop_tail (jbuf->packets);
436
437 return buf;
438}
439
440/**
441 * rdt_jitter_buffer_peek:
442 * @jbuf: an #RDTJitterBuffer
443 *
444 * Peek the oldest buffer from the packet queue of @jbuf. Register a callback
445 * with rdt_jitter_buffer_set_tail_changed() to be notified when an older packet
446 * was inserted in the queue.
447 *
448 * Returns: a #GstBuffer or %NULL when there was no packet in the queue.
449 */
450GstBuffer *
451rdt_jitter_buffer_peek (RDTJitterBuffer * jbuf)
452{
453 GstBuffer *buf;
454
455 g_return_val_if_fail (jbuf != NULL, FALSE);
456
457 buf = g_queue_peek_tail (jbuf->packets);
458
459 return buf;
460}
461
462/**
463 * rdt_jitter_buffer_flush:
464 * @jbuf: an #RDTJitterBuffer
465 *
466 * Flush all packets from the jitterbuffer.
467 */
468void
469rdt_jitter_buffer_flush (RDTJitterBuffer * jbuf)
470{
471 GstBuffer *buffer;
472
473 g_return_if_fail (jbuf != NULL);
474
475 while ((buffer = g_queue_pop_head (jbuf->packets)))
476 gst_buffer_unref (buffer);
477}
478
479/**
480 * rdt_jitter_buffer_num_packets:
481 * @jbuf: an #RDTJitterBuffer
482 *
483 * Get the number of packets currently in "jbuf.
484 *
485 * Returns: The number of packets in @jbuf.
486 */
487guint
488rdt_jitter_buffer_num_packets (RDTJitterBuffer * jbuf)
489{
490 g_return_val_if_fail (jbuf != NULL, 0);
491
492 return jbuf->packets->length;
493}
494
495/**
496 * rdt_jitter_buffer_get_ts_diff:
497 * @jbuf: an #RDTJitterBuffer
498 *
499 * Get the difference between the timestamps of first and last packet in the
500 * jitterbuffer.
501 *
502 * Returns: The difference expressed in the timestamp units of the packets.
503 */
504guint32
505rdt_jitter_buffer_get_ts_diff (RDTJitterBuffer * jbuf)
506{
507 guint64 high_ts, low_ts;
508 GstBuffer *high_buf, *low_buf;
509 guint32 result;
510
511 g_return_val_if_fail (jbuf != NULL, 0);
512
513 high_buf = g_queue_peek_head (jbuf->packets);
514 low_buf = g_queue_peek_tail (jbuf->packets);
515
516 if (!high_buf || !low_buf || high_buf == low_buf)
517 return 0;
518
519 //high_ts = gst_rtp_buffer_get_timestamp (high_buf);
520 //low_ts = gst_rtp_buffer_get_timestamp (low_buf);
521 high_ts = 0;
522 low_ts = 0;
523
524 /* it needs to work if ts wraps */
525 if (high_ts >= low_ts) {
526 result = (guint32) (high_ts - low_ts);
527 } else {
528 result = (guint32) (high_ts + G_MAXUINT32 + 1 - low_ts);
529 }
530 return result;
531}