xpra icon
Bug tracker and wiki

This bug tracker and wiki are being discontinued
please use https://github.com/Xpra-org/xpra instead.


Ticket #1118: sink.py

File sink.py, 14.8 KB (added by Kundan, 6 years ago)

sink.py file of xpra at location /usr/lib/python2.7/dist-packages/xpra.

Line 
1#!/usr/bin/env python
2# This file is part of Xpra.
3# Copyright (C) 2010-2015 Antoine Martin <antoine@devloop.org.uk>
4# Xpra is released under the terms of the GNU GPL v2, or, at your option, any
5# later version. See the file COPYING for details.
6
7import sys, os, time
8from collections import deque
9from threading import Lock
10
11from xpra.sound.sound_pipeline import SoundPipeline, gobject, one_arg_signal
12from xpra.sound.pulseaudio_util import has_pa
13from xpra.sound.gstreamer_util import plugin_str, get_decoder_parser, get_queue_time, normv, get_codecs, MP3, CODEC_ORDER, gst, QUEUE_LEAK, MS_TO_NS
14
15from xpra.scripts.config import InitExit
16from xpra.util import updict, csv
17from xpra.os_util import thread
18from xpra.log import Logger
19log = Logger("sound")
20
21
22SINKS = ["autoaudiosink"]
23DEFAULT_SINK = SINKS[0]
24if has_pa():
25    SINKS.append("pulsesink")
26    DEFAULT_SINK = "pulsesink"
27if sys.platform.startswith("darwin"):
28    SINKS.append("osxaudiosink")
29    DEFAULT_SINK = "osxaudiosink"
30elif sys.platform.startswith("win"):
31    SINKS.append("directsoundsink")
32    DEFAULT_SINK = "directsoundsink"
33if os.name=="posix":
34    SINKS += ["alsasink", "osssink", "oss4sink", "jackaudiosink"]
35
36SINK_SHARED_DEFAULT_ATTRIBUTES = {"sync"    : False,
37                                  "async"   : True,
38                                  "qos"     : True
39                                  }
40
41SINK_DEFAULT_ATTRIBUTES = {0 : {
42                                "pulsesink"  : {"client" : "Xpra"}
43                               },
44                           1 : {
45                                "pulsesink"  : {"client-name" : "Xpra"}
46                               },
47                          }
48
49DEFAULT_SINK = os.environ.get("XPRA_SOUND_SINK", DEFAULT_SINK)
50if DEFAULT_SINK not in SINKS:
51    log.error("invalid default sound sink: '%s' is not in %s, using %s instead", DEFAULT_SINK, SINKS, SINKS[0])
52    DEFAULT_SINK = SINKS[0]
53QUEUE_SILENT = 0
54QUEUE_TIME = get_queue_time(450)
55
56GRACE_PERIOD = int(os.environ.get("XPRA_SOUND_GRACE_PERIOD", "2000"))
57#percentage: from 0 for no margin, to 200% which triples the buffer target
58MARGIN = max(0, min(200, int(os.environ.get("XPRA_SOUND_MARGIN", "50"))))
59
60GST_FORMAT_BUFFERS = 4
61
62
63class SoundSink(SoundPipeline):
64
65    __gsignals__ = SoundPipeline.__generic_signals__.copy()
66    __gsignals__.update({
67        "eos"       : one_arg_signal,
68        })
69
70    def __init__(self, sink_type=None, sink_options={}, codecs=get_codecs(), codec_options={}, volume=1.0):
71        if not sink_type:
72            sink_type = DEFAULT_SINK
73        if sink_type not in SINKS:
74            raise InitExit(1, "invalid sink: %s" % sink_type)
75        matching = [x for x in CODEC_ORDER if (x in codecs and x in get_codecs())]
76        log("SoundSink(..) found matching codecs %s", matching)
77        if not matching:
78            raise InitExit(1, "no matching codecs between arguments '%s' and supported list '%s'" % (csv(codecs), csv(get_codecs().keys())))
79        codec = matching[0]
80        decoder, parser = get_decoder_parser(codec)
81        SoundPipeline.__init__(self, codec)
82        self.sink_type = sink_type
83        self.levels = deque(maxlen=100)
84        decoder_str = plugin_str(decoder, codec_options)
85        pipeline_els = []
86        appsrc_el = ["appsrc",
87                     "do-timestamp=1",
88                     "name=src",
89                     "emit-signals=0",
90                     "block=0",
91                     "is-live=0",
92                     "stream-type=stream",
93                     "format=%s" % GST_FORMAT_BUFFERS]
94        pipeline_els.append(" ".join(appsrc_el))
95        pipeline_els.append(parser)
96        pipeline_els.append(decoder_str)
97        pipeline_els.append("audioconvert")
98        pipeline_els.append("audioresample")
99        pipeline_els.append("volume name=volume volume=%s" % volume)
100        queue_el = ["queue",
101                    "name=queue",
102                    "min-threshold-time=0",
103                    "max-size-buffers=0",
104                    "max-size-bytes=0",
105                    "max-size-time=%s" % QUEUE_TIME,
106                    "leaky=%s" % QUEUE_LEAK]
107        if QUEUE_SILENT:
108            queue_el.append("silent=%s" % QUEUE_SILENT)
109        pipeline_els.append(" ".join(queue_el))
110        sink_attributes = SINK_SHARED_DEFAULT_ATTRIBUTES.copy()
111        from xpra.sound.gstreamer_util import gst_major_version
112        sink_attributes.update(SINK_DEFAULT_ATTRIBUTES.get(gst_major_version, {}).get(sink_type, {}))
113        sink_attributes.update(sink_options)
114        sink_str = plugin_str(sink_type, sink_attributes)
115        pipeline_els.append(sink_str)
116        self.setup_pipeline_and_bus(pipeline_els)
117        self.volume = self.pipeline.get_by_name("volume")
118        self.src    = self.pipeline.get_by_name("src")
119        self.queue  = self.pipeline.get_by_name("queue")
120        self.overruns = 0
121        self.underruns = 0
122        self.overrun_events = deque(maxlen=100)
123        self.underrun_events = deque(maxlen=100)
124        self.queue_state = "starting"
125        self.last_underrun = 0
126        self.last_overrun = 0
127        self.last_max_update = time.time()
128        self.level_lock = Lock()
129        if QUEUE_SILENT==0:
130            self.queue.connect("overrun", self.queue_overrun)
131            self.queue.connect("underrun", self.queue_underrun)
132            self.queue.connect("running", self.queue_running)
133            self.queue.connect("pushing", self.queue_pushing)
134
135    def __repr__(self):
136        return "SoundSink('%s' - %s)" % (self.pipeline_str, self.state)
137
138    def cleanup(self):
139        SoundPipeline.cleanup(self)
140        self.sink_type = ""
141        self.src = None
142
143
144    def queue_pushing(self, *args):
145        self.queue_state = "pushing"
146        self.emit_info()
147        return True
148
149    def queue_running(self, *args):
150        self.queue_state = "running"
151        self.set_min_level()
152        self.set_max_level()
153        self.emit_info()
154        return True
155
156    def queue_underrun(self, *args):
157        now = time.time()
158        if self.queue_state=="starting" or 1000*(now-self.start_time)<GRACE_PERIOD:
159            log("ignoring underrun during startup")
160            return
161        self.queue_state = "underrun"
162        if now-self.last_underrun>2:
163            self.last_underrun = now
164            self.set_min_level()
165            self.underrun_events.append(now)
166        self.emit_info()
167        return 1
168
169    def get_level_range(self, mintime=2, maxtime=10):
170        now = time.time()
171        filtered = [v for t,v in list(self.levels) if (now-t)>=mintime and (now-t)<=maxtime]
172        if len(filtered)>=10:
173            maxl = max(filtered)
174            minl = min(filtered)
175            #range of the levels recorded:
176            return maxl-minl
177        return 0
178
179    def set_min_level(self):
180        if not self.level_lock.acquire(False):
181            return
182        try:
183            lrange = self.get_level_range()
184            if lrange>0:
185                cmtt = self.queue.get_property("min-threshold-time")//MS_TO_NS
186                #from 100% down to 0% in 2 seconds after underrun:
187                now = time.time()
188                pct = max(0, int((self.last_underrun+2-now)*50))
189                mtt = min(50, pct*max(50, lrange)//200)
190                log("set_min_level pct=%2i, cmtt=%3i, mtt=%3i", pct, cmtt, mtt)
191                if cmtt!=mtt:
192                    self.queue.set_property("min-threshold-time", mtt*MS_TO_NS)
193                    log("set_min_level min-threshold-time=%s", mtt)
194        finally:
195            self.level_lock.release()
196
197    def set_max_level(self, force=False):
198        if not self.level_lock.acquire(False):
199            return
200        try:
201            lrange = self.get_level_range(mintime=0)
202            now = time.time()
203            log("set_max_level lrange=%3i, last_max_update=%is", lrange, int(now-self.last_max_update))
204            #more than one second since last update and we have a range:
205            if now-self.last_max_update>1 and lrange>0:
206                cmst = self.queue.get_property("max-size-time")//MS_TO_NS
207                #overruns in the last minute:
208                olm = len([x for x in list(self.overrun_events) if now-x<60])
209                #increase target if we have more than 5 overruns in the last minute:
210                target_mst = lrange*(100 + MARGIN + min(100, olm*20))//100
211                #from 100% down to 0% in 2 seconds after underrun:
212                pct = max(0, int((self.last_overrun+2-now)*50))
213                #use this last_overrun percentage value to temporarily decrease the target
214                #(causes overruns that drop packets and lower the buffer level)
215                target_mst = max(50, int(target_mst - pct*lrange//100))
216                mst = (cmst + target_mst)//2
217                #cap it at 1 second:
218                mst = min(mst, 1000)
219                log("set_max_level overrun count=%-2i, margin=%3i, pct=%2i, cmst=%3i, mst=%3i", olm, MARGIN, pct, cmst, mst)
220                if force or abs(cmst-mst)>=max(50, lrange//2):
221                    self.queue.set_property("max-size-time", mst*MS_TO_NS)
222                    self.last_max_update = now
223        finally:
224            self.level_lock.release()
225
226    def queue_overrun(self, *args):
227        now = time.time()
228        if self.queue_state=="starting" or 1000*(now-self.start_time)<GRACE_PERIOD:
229            log("ignoring overrun during startup")
230            return
231        clt = self.queue.get_property("current-level-time")//MS_TO_NS
232        log("overrun level=%ims", clt)
233        now = time.time()
234        #grace period of recording overruns:
235        #(because when we record an overrun, we lower the max-time,
236        # which causes more overruns!)
237        if self.last_overrun is None or now-self.last_overrun>2:
238            self.last_overrun = now
239            self.set_max_level()
240            self.overrun_events.append(now)
241        self.overruns += 1
242        return 1
243
244    def eos(self):
245        log("eos()")
246        if self.src:
247            self.src.emit('end-of-stream')
248        self.cleanup()
249        return 0
250
251    def get_info(self):
252        info = SoundPipeline.get_info(self)
253        if QUEUE_TIME>0:
254            clt = self.queue.get_property("current-level-time")
255            qmax = self.queue.get_property("max-size-time")
256            qmin = self.queue.get_property("min-threshold-time")
257            updict(info, "queue", {
258                "min"           : qmin//MS_TO_NS,
259                "max"           : qmax//MS_TO_NS,
260                "cur"           : clt//MS_TO_NS,
261                "pct"           : min(QUEUE_TIME, clt)*100//qmax,
262                "overruns"      : self.overruns,
263                "underruns"     : self.underruns,
264                "state"         : self.queue_state})
265        return info
266
267    def add_data(self, data, metadata=None):
268        if not self.src:
269            log("add_data(..) dropped, no source")
270            return
271       # if self.state=="stopped":
272       #     log("add_data(..) dropped, pipeline is stopped")
273       #     return
274        #having a timestamp causes problems with the queue and overruns:
275        log("add_data(%s bytes, %s) queue_state=%s", len(data), metadata, self.queue_state)
276        buf = gst.new_buffer(data)
277        if metadata:
278            #having a timestamp causes problems with the queue and overruns:
279            #ts = metadata.get("timestamp")
280            #if ts is not None:
281            #    buf.timestamp = normv(ts)
282            d = metadata.get("duration")
283            if d is not None:
284                d = normv(d)
285                if d>0:
286                    buf.duration = normv(d)
287        if self.push_buffer(buf):
288            self.buffer_count += 1
289            self.byte_count += len(data)
290            clt = self.queue.get_property("current-level-time")//MS_TO_NS
291            log("pushed %5i bytes, new buffer level: %3ims, queue state=%s", len(data), clt, self.queue_state)
292            self.levels.append((time.time(), clt))
293            if self.queue_state=="pushing":
294                self.set_min_level()
295                self.set_max_level()
296        self.emit_info()
297
298    def push_buffer(self, buf):
299        #buf.size = size
300        #buf.timestamp = timestamp
301        #buf.duration = duration
302        #buf.offset = offset
303        #buf.offset_end = offset_end
304        #buf.set_caps(gst.caps_from_string(caps))
305        r = self.src.emit("push-buffer", buf)
306        if r!=gst.FLOW_OK:
307            log.error("push-buffer error: %s", r)
308            self.emit('error', "push-buffer error: %s" % r)
309            return 0
310        return 1
311
312gobject.type_register(SoundSink)
313
314
315def main():
316    from xpra.platform import init, clean
317    init("Sound-Record")
318    try:
319        from xpra.gtk_common.gobject_compat import import_glib
320        glib = import_glib()
321        args = sys.argv
322        log.enable_debug()
323        import os.path
324        if len(args) not in (2, 3):
325            print("usage: %s [-v|--verbose] filename [codec]" % sys.argv[0])
326            return 1
327        filename = args[1]
328        if not os.path.exists(filename):
329            print("file %s does not exist" % filename)
330            return 2
331        codecs = get_codecs()
332        if len(args)==3:
333            codec = args[2]
334            if codec not in codecs:
335                print("invalid codec: %s" % codec)
336                return 2
337        else:
338            codec = None
339            parts = filename.split(".")
340            if len(parts)>1:
341                extension = parts[-1]
342                if extension.lower() in codecs:
343                    codec = extension.lower()
344                    print("guessed codec %s from file extension %s" % (codec, extension))
345            if codec is None:
346                print("assuming this is an mp3 file...")
347                codec = MP3
348
349        log.enable_debug()
350        with open(filename, "rb") as f:
351            data = f.read()
352        print("loaded %s bytes from %s" % (len(data), filename))
353        #force no leak since we push all the data at once
354        global QUEUE_LEAK, GST_QUEUE_NO_LEAK, QUEUE_SILENT
355      #  QUEUE_LEAK = GST_QUEUE_NO_LEAK
356        QUEUE_LEAK = 0
357        QUEUE_SILENT = 1
358        ss = SoundSink(codecs=codec)
359#        ss = SoundSink(codec)
360        ss.add_data(data)
361        def eos(*args):
362            print("eos")
363            glib.idle_add(glib_mainloop.quit)
364        ss.connect("eos", eos)
365        ss.start()
366
367        glib_mainloop = glib.MainLoop()
368
369        import signal
370        def deadly_signal(*args):
371            glib.idle_add(glib_mainloop.quit)
372        signal.signal(signal.SIGINT, deadly_signal)
373        signal.signal(signal.SIGTERM, deadly_signal)
374
375        def check_for_end(*args):
376            qtime = ss.queue.get_property("current-level-time")//MS_TO_NS
377            if qtime<=0:
378                log.info("underrun (end of stream)")
379                thread.start_new_thread(ss.stop, ())
380                glib.timeout_add(500, glib_mainloop.quit)
381                return False
382            return True
383        glib.timeout_add(1000, check_for_end)
384
385        glib_mainloop.run()
386        return 0
387    finally:
388        clean()
389
390
391if __name__ == "__main__":
392    sys.exit(main())