1 | #!/usr/bin/env python |
---|
2 | # This file is part of Xpra. |
---|
3 | # Copyright (C) 2010-2015 Antoine Martin <antoine@devloop.org.uk> |
---|
4 | # Xpra is released under the terms of the GNU GPL v2, or, at your option, any |
---|
5 | # later version. See the file COPYING for details. |
---|
6 | |
---|
7 | import sys, os, time |
---|
8 | from collections import deque |
---|
9 | from threading import Lock |
---|
10 | |
---|
11 | from xpra.sound.sound_pipeline import SoundPipeline |
---|
12 | from xpra.gtk_common.gobject_util import one_arg_signal, gobject |
---|
13 | from xpra.sound.gstreamer_util import plugin_str, get_decoder_parser, get_queue_time, normv, get_codecs, get_default_sink, get_sink_plugins, \ |
---|
14 | MP3, CODEC_ORDER, gst, QUEUE_LEAK, GST_QUEUE_NO_LEAK, MS_TO_NS, DEFAULT_SINK_PLUGIN_OPTIONS |
---|
15 | from xpra.gtk_common.gobject_compat import import_glib |
---|
16 | |
---|
17 | from xpra.scripts.config import InitExit |
---|
18 | from xpra.util import csv |
---|
19 | from xpra.os_util import thread |
---|
20 | from xpra.log import Logger |
---|
21 | log = Logger("sound") |
---|
22 | gstlog = Logger("gstreamer") |
---|
23 | |
---|
24 | glib = import_glib() |
---|
25 | |
---|
26 | |
---|
27 | SINKS = get_sink_plugins() |
---|
28 | DEFAULT_SINK = get_default_sink() |
---|
29 | |
---|
30 | SINK_SHARED_DEFAULT_ATTRIBUTES = {"sync" : False, |
---|
31 | "async" : True, |
---|
32 | "qos" : True |
---|
33 | } |
---|
34 | |
---|
35 | SINK_DEFAULT_ATTRIBUTES = {0 : { |
---|
36 | "pulsesink" : {"client" : "Xpra"} |
---|
37 | }, |
---|
38 | 1 : { |
---|
39 | "pulsesink" : {"client-name" : "Xpra"} |
---|
40 | }, |
---|
41 | } |
---|
42 | |
---|
43 | QUEUE_SILENT = os.environ.get("XPRA_QUEUE_SILENT", "0")=="1" |
---|
44 | QUEUE_TIME = get_queue_time(450) |
---|
45 | |
---|
46 | GRACE_PERIOD = int(os.environ.get("XPRA_SOUND_GRACE_PERIOD", "2000")) |
---|
47 | #percentage: from 0 for no margin, to 200% which triples the buffer target |
---|
48 | MARGIN = max(0, min(200, int(os.environ.get("XPRA_SOUND_MARGIN", "50")))) |
---|
49 | #how high we push up the min-level to prevent underruns: |
---|
50 | UNDERRUN_MIN_LEVEL = max(0, int(os.environ.get("XPRA_SOUND_UNDERRUN_MIN_LEVEL", "50"))) |
---|
51 | |
---|
52 | |
---|
53 | GST_FORMAT_BYTES = 2 |
---|
54 | GST_FORMAT_TIME = 3 |
---|
55 | GST_FORMAT_BUFFERS = 4 |
---|
56 | BUFFER_FORMAT = GST_FORMAT_BUFFERS |
---|
57 | |
---|
58 | GST_APP_STREAM_TYPE_STREAM = 0 |
---|
59 | STREAM_TYPE = GST_APP_STREAM_TYPE_STREAM |
---|
60 | |
---|
61 | |
---|
62 | class SoundSink(SoundPipeline): |
---|
63 | |
---|
64 | __gsignals__ = SoundPipeline.__generic_signals__.copy() |
---|
65 | __gsignals__.update({ |
---|
66 | "eos" : one_arg_signal, |
---|
67 | }) |
---|
68 | |
---|
69 | def __init__(self, sink_type=None, sink_options={}, codecs=get_codecs(), codec_options={}, volume=1.0): |
---|
70 | if not sink_type: |
---|
71 | sink_type = DEFAULT_SINK |
---|
72 | if sink_type not in SINKS: |
---|
73 | raise InitExit(1, "invalid sink: %s" % sink_type) |
---|
74 | matching = [x for x in CODEC_ORDER if (x in codecs and x in get_codecs())] |
---|
75 | log("SoundSink(..) found matching codecs %s", matching) |
---|
76 | if not matching: |
---|
77 | raise InitExit(1, "no matching codecs between arguments '%s' and supported list '%s'" % (csv(codecs), csv(get_codecs().keys()))) |
---|
78 | codec = matching[0] |
---|
79 | decoder, parser = get_decoder_parser(codec) |
---|
80 | SoundPipeline.__init__(self, codec) |
---|
81 | self.sink_type = sink_type |
---|
82 | self.levels = deque(maxlen=100) |
---|
83 | self.volume = None |
---|
84 | self.src = None |
---|
85 | self.queue = None |
---|
86 | self.gst_version = False |
---|
87 | self.overruns = 0 |
---|
88 | self.underruns = 0 |
---|
89 | self.overrun_events = deque(maxlen=100) |
---|
90 | self.queue_state = "starting" |
---|
91 | self.last_underrun = 0 |
---|
92 | self.last_overrun = 0 |
---|
93 | self.last_max_update = time.time() |
---|
94 | self.level_lock = Lock() |
---|
95 | decoder_str = plugin_str(decoder, codec_options) |
---|
96 | pipeline_els = [] |
---|
97 | appsrc_el = ["appsrc", |
---|
98 | "do-timestamp=0", |
---|
99 | "name=src", |
---|
100 | "emit-signals=0", |
---|
101 | "block=0", |
---|
102 | "is-live=0", |
---|
103 | "stream-type=%s" % STREAM_TYPE, |
---|
104 | "format=%s" % BUFFER_FORMAT] |
---|
105 | pipeline_els.append(" ".join(appsrc_el)) |
---|
106 | pipeline_els.append(parser) |
---|
107 | pipeline_els.append(decoder_str) |
---|
108 | pipeline_els.append("audioconvert") |
---|
109 | pipeline_els.append("audioresample") |
---|
110 | pipeline_els.append("volume name=volume volume=%s" % volume) |
---|
111 | if QUEUE_TIME>0: |
---|
112 | #pipeline_els.append("audiorate") |
---|
113 | pipeline_els.append(" ".join(["queue", |
---|
114 | "name=queue", |
---|
115 | "min-threshold-time=0", |
---|
116 | "max-size-buffers=0", |
---|
117 | "max-size-bytes=0", |
---|
118 | "max-size-time=%s" % QUEUE_TIME, |
---|
119 | "leaky=%s" % QUEUE_LEAK])) |
---|
120 | sink_attributes = SINK_SHARED_DEFAULT_ATTRIBUTES.copy() |
---|
121 | from xpra.sound.gstreamer_util import gst_major_version, get_gst_version |
---|
122 | #anything older than this may cause problems (ie: centos 6.x) |
---|
123 | #because the attributes may not exist |
---|
124 | if get_gst_version()>=(0, 10, 36): |
---|
125 | sink_attributes.update(SINK_DEFAULT_ATTRIBUTES.get(gst_major_version, {}).get(sink_type, {})) |
---|
126 | get_options_cb = DEFAULT_SINK_PLUGIN_OPTIONS.get(sink_type.replace("sink", "")) |
---|
127 | if get_options_cb: |
---|
128 | v = get_options_cb() |
---|
129 | log("%s()=%s", get_options_cb, v) |
---|
130 | sink_attributes.update(v) |
---|
131 | sink_attributes.update(sink_options) |
---|
132 | sink_attributes.update({ |
---|
133 | "drift-tolerance" : 4000000, |
---|
134 | "can-activate-pull" : 1, |
---|
135 | "slave-method" : 0, |
---|
136 | "provide-clock" : 1, |
---|
137 | "qos" : 1, |
---|
138 | "sync" : 1, |
---|
139 | "async" : 1, |
---|
140 | }) |
---|
141 | sink_str = plugin_str(sink_type, sink_attributes) |
---|
142 | pipeline_els.append(sink_str) |
---|
143 | if not self.setup_pipeline_and_bus(pipeline_els): |
---|
144 | return |
---|
145 | self.volume = self.pipeline.get_by_name("volume") |
---|
146 | self.src = self.pipeline.get_by_name("src") |
---|
147 | self.queue = self.pipeline.get_by_name("queue") |
---|
148 | self.gst_version = get_gst_version() |
---|
149 | if self.queue: |
---|
150 | if not QUEUE_SILENT: |
---|
151 | self.queue.connect("overrun", self.queue_overrun) |
---|
152 | self.queue.connect("underrun", self.queue_underrun) |
---|
153 | self.queue.connect("running", self.queue_running) |
---|
154 | self.queue.connect("pushing", self.queue_pushing) |
---|
155 | else: |
---|
156 | #older versions may not have the "silent" attribute, |
---|
157 | #in which case we will emit the signals for nothing |
---|
158 | try: |
---|
159 | self.queue.set_property("silent", False) |
---|
160 | except Exception as e: |
---|
161 | log("cannot silence the queue %s: %s", self.queue, e) |
---|
162 | |
---|
163 | def __repr__(self): |
---|
164 | return "SoundSink('%s' - %s)" % (self.pipeline_str, self.state) |
---|
165 | |
---|
166 | def cleanup(self): |
---|
167 | SoundPipeline.cleanup(self) |
---|
168 | self.sink_type = "" |
---|
169 | self.src = None |
---|
170 | |
---|
171 | |
---|
172 | def queue_pushing(self, *args): |
---|
173 | gstlog("queue_pushing") |
---|
174 | self.queue_state = "pushing" |
---|
175 | self.emit_info() |
---|
176 | return True |
---|
177 | |
---|
178 | def queue_running(self, *args): |
---|
179 | gstlog("queue_running") |
---|
180 | self.queue_state = "running" |
---|
181 | self.set_max_level() |
---|
182 | self.emit_info() |
---|
183 | return True |
---|
184 | |
---|
185 | def queue_underrun(self, *args): |
---|
186 | now = time.time() |
---|
187 | gstlog("queue_underrun") |
---|
188 | self.queue_state = "underrun" |
---|
189 | if now-self.last_underrun>5: |
---|
190 | self.last_underrun = now |
---|
191 | clt = self.queue.get_property("current-level-time")//MS_TO_NS |
---|
192 | mintt = self.queue.get_property("min-threshold-time")//MS_TO_NS |
---|
193 | maxtt = self.queue.get_property("max-size-time")//MS_TO_NS |
---|
194 | gstlog.warn("underrun: clt=%s mintt=%s maxtt=%s state=%s", clt, mintt, maxtt, self.state) |
---|
195 | if clt==0 and mintt==0 and self.state in ("running", "active"): |
---|
196 | #briefly raise the min threshold to try to fill up the queue |
---|
197 | #seems to at least stop the sink from emptying the buffer too quickly |
---|
198 | v = max(0, min(250, maxtt-50)) |
---|
199 | self.pipeline.set_state(gst.STATE_PAUSED) |
---|
200 | gstlog("underrun: temporarily pausing the pipeline") |
---|
201 | #self.queue.set_property("min-threshold-time", v) |
---|
202 | def resume(): |
---|
203 | gstlog("underrun: resuming pipeline from %s", self.state) |
---|
204 | if self.state=="paused": |
---|
205 | self.pipeline.set_state(gst.STATE_PLAYING) |
---|
206 | return False |
---|
207 | if self.state=="stopped": |
---|
208 | return False |
---|
209 | #run again later: |
---|
210 | return True |
---|
211 | #self.queue.set_property("min-threshold-time", 0) |
---|
212 | glib.timeout_add(v, resume) |
---|
213 | self.emit_info() |
---|
214 | return 1 |
---|
215 | |
---|
216 | def get_level_range(self, mintime=2, maxtime=10): |
---|
217 | now = time.time() |
---|
218 | filtered = [v for t,v in list(self.levels) if (now-t)>=mintime and (now-t)<=maxtime] |
---|
219 | if len(filtered)>=10: |
---|
220 | maxl = max(filtered) |
---|
221 | minl = min(filtered) |
---|
222 | #range of the levels recorded: |
---|
223 | return maxl-minl |
---|
224 | return 0 |
---|
225 | |
---|
226 | def set_max_level(self, force=False): |
---|
227 | if self.gst_version<(0, 10, 36): |
---|
228 | #causes scratchy sound with older versions of gstreamer 0.10 |
---|
229 | #ie: centos6 |
---|
230 | return |
---|
231 | if not self.level_lock.acquire(False): |
---|
232 | return |
---|
233 | try: |
---|
234 | lrange = self.get_level_range(mintime=0) |
---|
235 | now = time.time() |
---|
236 | log("set_max_level lrange=%3i, last_max_update=%is", lrange, int(now-self.last_max_update)) |
---|
237 | #more than one second since last update and we have a range: |
---|
238 | if now-self.last_max_update>1 and self.queue: |
---|
239 | cmst = self.queue.get_property("max-size-time")//MS_TO_NS |
---|
240 | #overruns in the last minute: |
---|
241 | olm = len([x for x in list(self.overrun_events) if now-x<60]) |
---|
242 | #increase target if we have more than 5 overruns in the last minute: |
---|
243 | target_mst = lrange*(100 + MARGIN + min(100, olm*20))//100 |
---|
244 | #from 100% down to 0% in 2 seconds after underrun: |
---|
245 | pct = max(0, int((self.last_overrun+2-now)*50)) |
---|
246 | #use this last_overrun percentage value to temporarily decrease the target |
---|
247 | #(causes overruns that drop packets and lower the buffer level) |
---|
248 | target_mst = max(50, int(target_mst - pct*lrange//100)) |
---|
249 | mst = (cmst + target_mst)//2 |
---|
250 | #cap it at 1 second: |
---|
251 | mst = min(mst, 1000) |
---|
252 | log("set_max_level overrun count=%-2i, margin=%3i, pct=%2i, cmst=%3i, mst=%3i", olm, MARGIN, pct, cmst, mst) |
---|
253 | if force or abs(cmst-mst)>=max(50, lrange//2): |
---|
254 | self.queue.set_property("max-size-time", mst*MS_TO_NS) |
---|
255 | self.last_max_update = now |
---|
256 | finally: |
---|
257 | self.level_lock.release() |
---|
258 | |
---|
259 | def queue_overrun(self, *args): |
---|
260 | now = time.time() |
---|
261 | if self.queue_state=="starting" or 1000*(now-self.start_time)<GRACE_PERIOD: |
---|
262 | gstlog("ignoring overrun during startup") |
---|
263 | return 1 |
---|
264 | clt = self.queue.get_property("current-level-time")//MS_TO_NS |
---|
265 | log("overrun level=%ims", clt) |
---|
266 | now = time.time() |
---|
267 | #grace period of recording overruns: |
---|
268 | #(because when we record an overrun, we lower the max-time, |
---|
269 | # which causes more overruns!) |
---|
270 | if self.last_overrun is None or now-self.last_overrun>2: |
---|
271 | self.last_overrun = now |
---|
272 | self.set_max_level() |
---|
273 | self.overrun_events.append(now) |
---|
274 | self.overruns += 1 |
---|
275 | return 1 |
---|
276 | |
---|
277 | def eos(self): |
---|
278 | gstlog("eos()") |
---|
279 | if self.src: |
---|
280 | self.src.emit('end-of-stream') |
---|
281 | self.cleanup() |
---|
282 | return 0 |
---|
283 | |
---|
284 | def get_info(self): |
---|
285 | info = SoundPipeline.get_info(self) |
---|
286 | if QUEUE_TIME>0 and self.queue: |
---|
287 | clt = self.queue.get_property("current-level-time") |
---|
288 | qmax = self.queue.get_property("max-size-time") |
---|
289 | qmin = self.queue.get_property("min-threshold-time") |
---|
290 | info["queue"] = { |
---|
291 | "min" : qmin//MS_TO_NS, |
---|
292 | "max" : qmax//MS_TO_NS, |
---|
293 | "cur" : clt//MS_TO_NS, |
---|
294 | "pct" : min(QUEUE_TIME, clt)*100//qmax, |
---|
295 | "overruns" : self.overruns, |
---|
296 | "underruns" : self.underruns, |
---|
297 | "state" : self.queue_state, |
---|
298 | } |
---|
299 | return info |
---|
300 | |
---|
301 | def add_data(self, data, metadata=None): |
---|
302 | if not self.src: |
---|
303 | log("add_data(..) dropped, no source") |
---|
304 | return |
---|
305 | if self.state=="stopped": |
---|
306 | log("add_data(..) dropped, pipeline is stopped") |
---|
307 | return |
---|
308 | #having a timestamp causes problems with the queue and overruns: |
---|
309 | log("add_data(%s bytes, %s) queue_state=%s", len(data), metadata, self.queue_state) |
---|
310 | buf = gst.new_buffer(data) |
---|
311 | if metadata: |
---|
312 | #having a timestamp causes problems with the queue and overruns: |
---|
313 | #ts = metadata.get("timestamp") |
---|
314 | #if ts is not None: |
---|
315 | # buf.timestamp = normv(ts) |
---|
316 | # log.info("timestamp=%s", ts) |
---|
317 | d = metadata.get("duration") |
---|
318 | if d is not None: |
---|
319 | d = normv(d) |
---|
320 | if d>0: |
---|
321 | buf.duration = normv(d) |
---|
322 | if self.push_buffer(buf): |
---|
323 | self.inc_buffer_count() |
---|
324 | self.inc_byte_count(len(data)) |
---|
325 | if self.queue: |
---|
326 | clt = self.queue.get_property("current-level-time")//MS_TO_NS |
---|
327 | log("pushed %5i bytes, new buffer level: %3ims, queue state=%s", len(data), clt, self.queue_state) |
---|
328 | self.levels.append((time.time(), clt)) |
---|
329 | if self.queue_state=="pushing": |
---|
330 | self.set_max_level() |
---|
331 | self.emit_info() |
---|
332 | |
---|
333 | def push_buffer(self, buf): |
---|
334 | #buf.size = size |
---|
335 | #buf.timestamp = timestamp |
---|
336 | #buf.duration = duration |
---|
337 | #buf.offset = offset |
---|
338 | #buf.offset_end = offset_end |
---|
339 | #buf.set_caps(gst.caps_from_string(caps)) |
---|
340 | r = self.src.emit("push-buffer", buf) |
---|
341 | if r!=gst.FLOW_OK: |
---|
342 | if self.queue_state != "error": |
---|
343 | log.error("Error pushing buffer: %s", r) |
---|
344 | self.update_state("error") |
---|
345 | self.emit('error', "push-buffer error: %s" % r) |
---|
346 | return 0 |
---|
347 | return 1 |
---|
348 | |
---|
349 | gobject.type_register(SoundSink) |
---|
350 | |
---|
351 | |
---|
352 | def main(): |
---|
353 | from xpra.platform import program_context |
---|
354 | with program_context("Sound-Record"): |
---|
355 | args = sys.argv |
---|
356 | log.enable_debug() |
---|
357 | import os.path |
---|
358 | if len(args) not in (2, 3): |
---|
359 | print("usage: %s [-v|--verbose] filename [codec]" % sys.argv[0]) |
---|
360 | return 1 |
---|
361 | filename = args[1] |
---|
362 | if not os.path.exists(filename): |
---|
363 | print("file %s does not exist" % filename) |
---|
364 | return 2 |
---|
365 | codecs = get_codecs() |
---|
366 | if len(args)==3: |
---|
367 | codec = args[2] |
---|
368 | if codec not in codecs: |
---|
369 | print("invalid codec: %s" % codec) |
---|
370 | print("only supported: %s" % str(codecs.keys())) |
---|
371 | return 2 |
---|
372 | codecs = [codec] |
---|
373 | else: |
---|
374 | codec = None |
---|
375 | parts = filename.split(".") |
---|
376 | if len(parts)>1: |
---|
377 | extension = parts[-1] |
---|
378 | if extension.lower() in codecs: |
---|
379 | codec = extension.lower() |
---|
380 | print("guessed codec %s from file extension %s" % (codec, extension)) |
---|
381 | if codec is None: |
---|
382 | print("assuming this is an mp3 file...") |
---|
383 | codec = MP3 |
---|
384 | codecs = [codec] |
---|
385 | |
---|
386 | log.enable_debug() |
---|
387 | with open(filename, "rb") as f: |
---|
388 | data = f.read() |
---|
389 | print("loaded %s bytes from %s" % (len(data), filename)) |
---|
390 | #force no leak since we push all the data at once |
---|
391 | global QUEUE_LEAK, QUEUE_SILENT |
---|
392 | QUEUE_LEAK = GST_QUEUE_NO_LEAK |
---|
393 | QUEUE_SILENT = True |
---|
394 | ss = SoundSink(codecs=codecs) |
---|
395 | def eos(*args): |
---|
396 | print("eos") |
---|
397 | glib.idle_add(glib_mainloop.quit) |
---|
398 | ss.connect("eos", eos) |
---|
399 | ss.start() |
---|
400 | |
---|
401 | glib_mainloop = glib.MainLoop() |
---|
402 | |
---|
403 | import signal |
---|
404 | def deadly_signal(*args): |
---|
405 | glib.idle_add(ss.stop) |
---|
406 | glib.idle_add(glib_mainloop.quit) |
---|
407 | def force_quit(sig, frame): |
---|
408 | sys.exit() |
---|
409 | signal.signal(signal.SIGINT, force_quit) |
---|
410 | signal.signal(signal.SIGTERM, force_quit) |
---|
411 | from xpra.gtk_common.gobject_compat import is_gtk3 |
---|
412 | if not is_gtk3(): |
---|
413 | signal.signal(signal.SIGINT, deadly_signal) |
---|
414 | signal.signal(signal.SIGTERM, deadly_signal) |
---|
415 | |
---|
416 | def check_for_end(*args): |
---|
417 | qtime = ss.queue.get_property("current-level-time")//MS_TO_NS |
---|
418 | if qtime<=0: |
---|
419 | log.info("underrun (end of stream)") |
---|
420 | thread.start_new_thread(ss.stop, ()) |
---|
421 | glib.timeout_add(500, glib_mainloop.quit) |
---|
422 | return False |
---|
423 | return True |
---|
424 | glib.timeout_add(1000, check_for_end) |
---|
425 | glib.idle_add(ss.add_data, data) |
---|
426 | |
---|
427 | glib_mainloop.run() |
---|
428 | return 0 |
---|
429 | |
---|
430 | |
---|
431 | if __name__ == "__main__": |
---|
432 | sys.exit(main()) |
---|