xpra icon
Bug tracker and wiki

This bug tracker and wiki are being discontinued
please use https://github.com/Xpra-org/xpra instead.


Ticket #1178: sink.py

File sink.py, 17.3 KB (added by Antoine Martin, 5 years ago)

work in progress: use pipeline pause to allow the buffer to grow

Line 
1#!/usr/bin/env python
2# This file is part of Xpra.
3# Copyright (C) 2010-2015 Antoine Martin <antoine@devloop.org.uk>
4# Xpra is released under the terms of the GNU GPL v2, or, at your option, any
5# later version. See the file COPYING for details.
6
7import sys, os, time
8from collections import deque
9from threading import Lock
10
11from xpra.sound.sound_pipeline import SoundPipeline
12from xpra.gtk_common.gobject_util import one_arg_signal, gobject
13from xpra.sound.gstreamer_util import plugin_str, get_decoder_parser, get_queue_time, normv, get_codecs, get_default_sink, get_sink_plugins, \
14                                        MP3, CODEC_ORDER, gst, QUEUE_LEAK, GST_QUEUE_NO_LEAK, MS_TO_NS, DEFAULT_SINK_PLUGIN_OPTIONS
15from xpra.gtk_common.gobject_compat import import_glib
16
17from xpra.scripts.config import InitExit
18from xpra.util import csv
19from xpra.os_util import thread
20from xpra.log import Logger
21log = Logger("sound")
22gstlog = Logger("gstreamer")
23
24glib = import_glib()
25
26
27SINKS = get_sink_plugins()
28DEFAULT_SINK = get_default_sink()
29
30SINK_SHARED_DEFAULT_ATTRIBUTES = {"sync"    : False,
31                                  "async"   : True,
32                                  "qos"     : True
33                                  }
34
35SINK_DEFAULT_ATTRIBUTES = {0 : {
36                                "pulsesink"  : {"client" : "Xpra"}
37                               },
38                           1 : {
39                                "pulsesink"  : {"client-name" : "Xpra"}
40                               },
41                          }
42
43QUEUE_SILENT = os.environ.get("XPRA_QUEUE_SILENT", "0")=="1"
44QUEUE_TIME = get_queue_time(450)
45
46GRACE_PERIOD = int(os.environ.get("XPRA_SOUND_GRACE_PERIOD", "2000"))
47#percentage: from 0 for no margin, to 200% which triples the buffer target
48MARGIN = max(0, min(200, int(os.environ.get("XPRA_SOUND_MARGIN", "50"))))
49#how high we push up the min-level to prevent underruns:
50UNDERRUN_MIN_LEVEL = max(0, int(os.environ.get("XPRA_SOUND_UNDERRUN_MIN_LEVEL", "50")))
51
52
53GST_FORMAT_BYTES = 2
54GST_FORMAT_TIME = 3
55GST_FORMAT_BUFFERS = 4
56BUFFER_FORMAT = GST_FORMAT_BUFFERS
57
58GST_APP_STREAM_TYPE_STREAM = 0
59STREAM_TYPE = GST_APP_STREAM_TYPE_STREAM
60
61
62class SoundSink(SoundPipeline):
63
64    __gsignals__ = SoundPipeline.__generic_signals__.copy()
65    __gsignals__.update({
66        "eos"       : one_arg_signal,
67        })
68
69    def __init__(self, sink_type=None, sink_options={}, codecs=get_codecs(), codec_options={}, volume=1.0):
70        if not sink_type:
71            sink_type = DEFAULT_SINK
72        if sink_type not in SINKS:
73            raise InitExit(1, "invalid sink: %s" % sink_type)
74        matching = [x for x in CODEC_ORDER if (x in codecs and x in get_codecs())]
75        log("SoundSink(..) found matching codecs %s", matching)
76        if not matching:
77            raise InitExit(1, "no matching codecs between arguments '%s' and supported list '%s'" % (csv(codecs), csv(get_codecs().keys())))
78        codec = matching[0]
79        decoder, parser = get_decoder_parser(codec)
80        SoundPipeline.__init__(self, codec)
81        self.sink_type = sink_type
82        self.levels = deque(maxlen=100)
83        self.volume = None
84        self.src    = None
85        self.queue  = None
86        self.gst_version = False
87        self.overruns = 0
88        self.underruns = 0
89        self.overrun_events = deque(maxlen=100)
90        self.queue_state = "starting"
91        self.last_underrun = 0
92        self.last_overrun = 0
93        self.last_max_update = time.time()
94        self.level_lock = Lock()
95        decoder_str = plugin_str(decoder, codec_options)
96        pipeline_els = []
97        appsrc_el = ["appsrc",
98                     "do-timestamp=0",
99                     "name=src",
100                     "emit-signals=0",
101                     "block=0",
102                     "is-live=0",
103                     "stream-type=%s" % STREAM_TYPE,
104                     "format=%s" % BUFFER_FORMAT]
105        pipeline_els.append(" ".join(appsrc_el))
106        pipeline_els.append(parser)
107        pipeline_els.append(decoder_str)
108        pipeline_els.append("audioconvert")
109        pipeline_els.append("audioresample")
110        pipeline_els.append("volume name=volume volume=%s" % volume)
111        if QUEUE_TIME>0:
112            #pipeline_els.append("audiorate")
113            pipeline_els.append(" ".join(["queue",
114                                          "name=queue",
115                                          "min-threshold-time=0",
116                                          "max-size-buffers=0",
117                                          "max-size-bytes=0",
118                                          "max-size-time=%s" % QUEUE_TIME,
119                                          "leaky=%s" % QUEUE_LEAK]))
120        sink_attributes = SINK_SHARED_DEFAULT_ATTRIBUTES.copy()
121        from xpra.sound.gstreamer_util import gst_major_version, get_gst_version
122        #anything older than this may cause problems (ie: centos 6.x)
123        #because the attributes may not exist
124        if get_gst_version()>=(0, 10, 36):
125            sink_attributes.update(SINK_DEFAULT_ATTRIBUTES.get(gst_major_version, {}).get(sink_type, {}))
126        get_options_cb = DEFAULT_SINK_PLUGIN_OPTIONS.get(sink_type.replace("sink", ""))
127        if get_options_cb:
128            v = get_options_cb()
129            log("%s()=%s", get_options_cb, v)
130            sink_attributes.update(v)
131        sink_attributes.update(sink_options)
132        sink_attributes.update({
133                                "drift-tolerance" : 4000000,
134                                "can-activate-pull" : 1,
135                                "slave-method"  : 0,
136                                "provide-clock" : 1,
137                                "qos"           : 1,
138                                "sync"          : 1,
139                                "async"         : 1,
140                                })
141        sink_str = plugin_str(sink_type, sink_attributes)
142        pipeline_els.append(sink_str)
143        if not self.setup_pipeline_and_bus(pipeline_els):
144            return
145        self.volume = self.pipeline.get_by_name("volume")
146        self.src    = self.pipeline.get_by_name("src")
147        self.queue  = self.pipeline.get_by_name("queue")
148        self.gst_version = get_gst_version()
149        if self.queue:
150            if not QUEUE_SILENT:
151                self.queue.connect("overrun", self.queue_overrun)
152                self.queue.connect("underrun", self.queue_underrun)
153                self.queue.connect("running", self.queue_running)
154                self.queue.connect("pushing", self.queue_pushing)
155            else:
156                #older versions may not have the "silent" attribute,
157                #in which case we will emit the signals for nothing
158                try:
159                    self.queue.set_property("silent", False)
160                except Exception as e:
161                    log("cannot silence the queue %s: %s", self.queue, e)
162
163    def __repr__(self):
164        return "SoundSink('%s' - %s)" % (self.pipeline_str, self.state)
165
166    def cleanup(self):
167        SoundPipeline.cleanup(self)
168        self.sink_type = ""
169        self.src = None
170
171
172    def queue_pushing(self, *args):
173        gstlog("queue_pushing")
174        self.queue_state = "pushing"
175        self.emit_info()
176        return True
177
178    def queue_running(self, *args):
179        gstlog("queue_running")
180        self.queue_state = "running"
181        self.set_max_level()
182        self.emit_info()
183        return True
184
185    def queue_underrun(self, *args):
186        now = time.time()
187        gstlog("queue_underrun")
188        self.queue_state = "underrun"
189        if now-self.last_underrun>5:
190            self.last_underrun = now
191            clt = self.queue.get_property("current-level-time")//MS_TO_NS
192            mintt = self.queue.get_property("min-threshold-time")//MS_TO_NS
193            maxtt = self.queue.get_property("max-size-time")//MS_TO_NS
194            gstlog.warn("underrun: clt=%s mintt=%s maxtt=%s state=%s", clt, mintt, maxtt, self.state)
195            if clt==0 and mintt==0 and self.state in ("running", "active"):
196                #briefly raise the min threshold to try to fill up the queue
197                #seems to at least stop the sink from emptying the buffer too quickly
198                v = max(0, min(250, maxtt-50))
199                self.pipeline.set_state(gst.STATE_PAUSED)
200                gstlog("underrun: temporarily pausing the pipeline")
201                #self.queue.set_property("min-threshold-time", v)
202                def resume():
203                    gstlog("underrun: resuming pipeline from %s", self.state)
204                    if self.state=="paused":
205                        self.pipeline.set_state(gst.STATE_PLAYING)
206                        return False
207                    if self.state=="stopped":
208                        return False
209                    #run again later:
210                    return True
211                    #self.queue.set_property("min-threshold-time", 0)
212                glib.timeout_add(v, resume)
213        self.emit_info()
214        return 1
215
216    def get_level_range(self, mintime=2, maxtime=10):
217        now = time.time()
218        filtered = [v for t,v in list(self.levels) if (now-t)>=mintime and (now-t)<=maxtime]
219        if len(filtered)>=10:
220            maxl = max(filtered)
221            minl = min(filtered)
222            #range of the levels recorded:
223            return maxl-minl
224        return 0
225
226    def set_max_level(self, force=False):
227        if self.gst_version<(0, 10, 36):
228            #causes scratchy sound with older versions of gstreamer 0.10
229            #ie: centos6
230            return
231        if not self.level_lock.acquire(False):
232            return
233        try:
234            lrange = self.get_level_range(mintime=0)
235            now = time.time()
236            log("set_max_level lrange=%3i, last_max_update=%is", lrange, int(now-self.last_max_update))
237            #more than one second since last update and we have a range:
238            if now-self.last_max_update>1 and self.queue:
239                cmst = self.queue.get_property("max-size-time")//MS_TO_NS
240                #overruns in the last minute:
241                olm = len([x for x in list(self.overrun_events) if now-x<60])
242                #increase target if we have more than 5 overruns in the last minute:
243                target_mst = lrange*(100 + MARGIN + min(100, olm*20))//100
244                #from 100% down to 0% in 2 seconds after underrun:
245                pct = max(0, int((self.last_overrun+2-now)*50))
246                #use this last_overrun percentage value to temporarily decrease the target
247                #(causes overruns that drop packets and lower the buffer level)
248                target_mst = max(50, int(target_mst - pct*lrange//100))
249                mst = (cmst + target_mst)//2
250                #cap it at 1 second:
251                mst = min(mst, 1000)
252                log("set_max_level overrun count=%-2i, margin=%3i, pct=%2i, cmst=%3i, mst=%3i", olm, MARGIN, pct, cmst, mst)
253                if force or abs(cmst-mst)>=max(50, lrange//2):
254                    self.queue.set_property("max-size-time", mst*MS_TO_NS)
255                    self.last_max_update = now
256        finally:
257            self.level_lock.release()
258
259    def queue_overrun(self, *args):
260        now = time.time()
261        if self.queue_state=="starting" or 1000*(now-self.start_time)<GRACE_PERIOD:
262            gstlog("ignoring overrun during startup")
263            return 1
264        clt = self.queue.get_property("current-level-time")//MS_TO_NS
265        log("overrun level=%ims", clt)
266        now = time.time()
267        #grace period of recording overruns:
268        #(because when we record an overrun, we lower the max-time,
269        # which causes more overruns!)
270        if self.last_overrun is None or now-self.last_overrun>2:
271            self.last_overrun = now
272            self.set_max_level()
273            self.overrun_events.append(now)
274        self.overruns += 1
275        return 1
276
277    def eos(self):
278        gstlog("eos()")
279        if self.src:
280            self.src.emit('end-of-stream')
281        self.cleanup()
282        return 0
283
284    def get_info(self):
285        info = SoundPipeline.get_info(self)
286        if QUEUE_TIME>0 and self.queue:
287            clt = self.queue.get_property("current-level-time")
288            qmax = self.queue.get_property("max-size-time")
289            qmin = self.queue.get_property("min-threshold-time")
290            info["queue"] = {
291                             "min"          : qmin//MS_TO_NS,
292                             "max"          : qmax//MS_TO_NS,
293                             "cur"          : clt//MS_TO_NS,
294                             "pct"          : min(QUEUE_TIME, clt)*100//qmax,
295                             "overruns"     : self.overruns,
296                             "underruns"    : self.underruns,
297                             "state"        : self.queue_state,
298                             }
299        return info
300
301    def add_data(self, data, metadata=None):
302        if not self.src:
303            log("add_data(..) dropped, no source")
304            return
305        if self.state=="stopped":
306            log("add_data(..) dropped, pipeline is stopped")
307            return
308        #having a timestamp causes problems with the queue and overruns:
309        log("add_data(%s bytes, %s) queue_state=%s", len(data), metadata, self.queue_state)
310        buf = gst.new_buffer(data)
311        if metadata:
312            #having a timestamp causes problems with the queue and overruns:
313            #ts = metadata.get("timestamp")
314            #if ts is not None:
315            #    buf.timestamp = normv(ts)
316            #    log.info("timestamp=%s", ts)
317            d = metadata.get("duration")
318            if d is not None:
319                d = normv(d)
320                if d>0:
321                    buf.duration = normv(d)
322        if self.push_buffer(buf):
323            self.inc_buffer_count()
324            self.inc_byte_count(len(data))
325            if self.queue:
326                clt = self.queue.get_property("current-level-time")//MS_TO_NS
327                log("pushed %5i bytes, new buffer level: %3ims, queue state=%s", len(data), clt, self.queue_state)
328                self.levels.append((time.time(), clt))
329            if self.queue_state=="pushing":
330                self.set_max_level()
331        self.emit_info()
332
333    def push_buffer(self, buf):
334        #buf.size = size
335        #buf.timestamp = timestamp
336        #buf.duration = duration
337        #buf.offset = offset
338        #buf.offset_end = offset_end
339        #buf.set_caps(gst.caps_from_string(caps))
340        r = self.src.emit("push-buffer", buf)
341        if r!=gst.FLOW_OK:
342            if self.queue_state != "error":
343                log.error("Error pushing buffer: %s", r)
344                self.update_state("error")
345                self.emit('error', "push-buffer error: %s" % r)
346            return 0
347        return 1
348
349gobject.type_register(SoundSink)
350
351
352def main():
353    from xpra.platform import program_context
354    with program_context("Sound-Record"):
355        args = sys.argv
356        log.enable_debug()
357        import os.path
358        if len(args) not in (2, 3):
359            print("usage: %s [-v|--verbose] filename [codec]" % sys.argv[0])
360            return 1
361        filename = args[1]
362        if not os.path.exists(filename):
363            print("file %s does not exist" % filename)
364            return 2
365        codecs = get_codecs()
366        if len(args)==3:
367            codec = args[2]
368            if codec not in codecs:
369                print("invalid codec: %s" % codec)
370                print("only supported: %s" % str(codecs.keys()))
371                return 2
372            codecs = [codec]
373        else:
374            codec = None
375            parts = filename.split(".")
376            if len(parts)>1:
377                extension = parts[-1]
378                if extension.lower() in codecs:
379                    codec = extension.lower()
380                    print("guessed codec %s from file extension %s" % (codec, extension))
381            if codec is None:
382                print("assuming this is an mp3 file...")
383                codec = MP3
384            codecs = [codec]
385
386        log.enable_debug()
387        with open(filename, "rb") as f:
388            data = f.read()
389        print("loaded %s bytes from %s" % (len(data), filename))
390        #force no leak since we push all the data at once
391        global QUEUE_LEAK, QUEUE_SILENT
392        QUEUE_LEAK = GST_QUEUE_NO_LEAK
393        QUEUE_SILENT = True
394        ss = SoundSink(codecs=codecs)
395        def eos(*args):
396            print("eos")
397            glib.idle_add(glib_mainloop.quit)
398        ss.connect("eos", eos)
399        ss.start()
400
401        glib_mainloop = glib.MainLoop()
402
403        import signal
404        def deadly_signal(*args):
405            glib.idle_add(ss.stop)
406            glib.idle_add(glib_mainloop.quit)
407            def force_quit(sig, frame):
408                sys.exit()
409            signal.signal(signal.SIGINT, force_quit)
410            signal.signal(signal.SIGTERM, force_quit)
411        from xpra.gtk_common.gobject_compat import is_gtk3
412        if not is_gtk3():
413            signal.signal(signal.SIGINT, deadly_signal)
414        signal.signal(signal.SIGTERM, deadly_signal)
415
416        def check_for_end(*args):
417            qtime = ss.queue.get_property("current-level-time")//MS_TO_NS
418            if qtime<=0:
419                log.info("underrun (end of stream)")
420                thread.start_new_thread(ss.stop, ())
421                glib.timeout_add(500, glib_mainloop.quit)
422                return False
423            return True
424        glib.timeout_add(1000, check_for_end)
425        glib.idle_add(ss.add_data, data)
426
427        glib_mainloop.run()
428        return 0
429
430
431if __name__ == "__main__":
432    sys.exit(main())