xpra icon
Bug tracker and wiki

This bug tracker and wiki are being discontinued
please use https://github.com/Xpra-org/xpra instead.


Ticket #849: sound-tune.patch

File sound-tune.patch, 10.2 KB (added by Antoine Martin, 6 years ago)

try to tune the queue dynamically to minimize buffering

  • xpra/sound/sink.py

     
    44# Xpra is released under the terms of the GNU GPL v2, or, at your option, any
    55# later version. See the file COPYING for details.
    66
    7 import sys, os
     7import sys, os, time
     8from collections import deque
    89
    910from xpra.sound.sound_pipeline import SoundPipeline, gobject, glib, one_arg_signal
    1011from xpra.sound.pulseaudio_util import has_pa
     
    1415from xpra.os_util import thread
    1516from xpra.log import Logger
    1617log = Logger("sound")
     18log.enable_debug()
    1719
    1820
    1921SINKS = ["autoaudiosink"]
     
    7375        decoder, parser = get_decoder_parser(codec)
    7476        SoundPipeline.__init__(self, codec)
    7577        self.sink_type = sink_type
     78        self.levels = deque(maxlen=100)
    7679        decoder_str = plugin_str(decoder, codec_options)
    7780        pipeline_els = []
    78         pipeline_els.append("appsrc"+
    79                             " name=src"+
    80                             " emit-signals=0"+
    81                             " block=0"+
    82                             " is-live=0"+
    83                             " stream-type=stream"+
    84                             " format=%s" % GST_FORMAT_BUFFERS)
     81        appsrc_el = ["appsrc",
     82                     "name=src",
     83                     "emit-signals=0",
     84                     "block=0",
     85                     "is-live=0",
     86                     "stream-type=stream",
     87                     "format=%s" % GST_FORMAT_BUFFERS]
     88        pipeline_els.append(" ".join(appsrc_el))
    8589        pipeline_els.append(parser)
    8690        pipeline_els.append(decoder_str)
    8791        pipeline_els.append("audioconvert")
     
    106110        self.volume = self.pipeline.get_by_name("volume")
    107111        self.src    = self.pipeline.get_by_name("src")
    108112        self.queue  = self.pipeline.get_by_name("queue")
    109         self.overruns = 0
     113        self.overruns = deque(maxlen=100)
     114        self.underruns = deque(maxlen=100)
    110115        self.queue_state = "starting"
     116        self.last_underrun = None
     117        self.last_overrun = None
    111118        if QUEUE_SILENT==0:
    112119            self.queue.connect("overrun", self.queue_overrun)
    113120            self.queue.connect("underrun", self.queue_underrun)
     
    124131
    125132
    126133    def queue_pushing(self, *args):
    127         ltime = self.queue.get_property("current-level-time")/MS_TO_NS
    128         log("queue pushing: level=%i", ltime)
    129134        self.check_levels("pushing")
    130         return 0
     135        return True
    131136
    132137    def queue_running(self, *args):
    133         ltime = self.queue.get_property("current-level-time")/MS_TO_NS
    134         log("queue running: level=%s", ltime)
    135138        self.check_levels("running")
    136         return 0
     139        return True
    137140
    138141    def check_levels(self, new_state):
    139         if self.queue_state=="underrun":
    140             #lift min time restrictions:
    141             self.queue.set_property("min-threshold-time", 0)
    142         elif self.queue_state == "overrun":
    143             clt = self.queue.get_property("current-level-time")
    144             qpct = min(QUEUE_TIME, clt)*100//QUEUE_TIME
    145             log("resetting max-size-time back to %ims (level=%ims, %i%%)", QUEUE_TIME//MS_TO_NS, clt//MS_TO_NS, qpct)
    146             self.queue.set_property("max-size-time", QUEUE_TIME)
     142        clt = self.queue.get_property("current-level-time")/MS_TO_NS
    147143        self.queue_state = new_state
     144        now = time.time()
     145        #need at least 10 levels in the last 10 seconds:
     146        mintime = now-10
     147        maxtime = now-2
     148        filtered = [v for t,v in list(self.levels) if t>=mintime and t<=maxtime]
     149        if len(filtered)>=10:
     150            maxl = max(filtered)
     151            minl = min(filtered)
     152            #range of the levels recorded:
     153            lrange = maxl-minl
     154            #at least 100ms, and give at least 50ms more than historical data:
     155            target_range = max(60, lrange+50)
     156            #calculate new bounds:
     157            def adjust_val(from_time):
     158                if not from_time:
     159                    return 0
     160                #from 100% down to 0% in 2 seconds after underrun/overrun:
     161                pct = max(0, int((from_time+2-now)*50))
     162                #shift by up to 1/2 of the target range
     163                return min(100, pct*target_range//200)
     164            #target_level +=  - adjust_val(self.last_overrun)
     165            #make sure we don't adjust too low:
     166            mtt = adjust_val(self.last_underrun)
     167            mst = target_range - adjust_val(self.last_overrun)
     168            log("check_levels filtered=%s", filtered[-10:])
     169            log("check_levels state=%-10s level=%3i, range=%3i, setting targets: %3i - %3i", new_state, clt, lrange, mtt, mst)
     170            cmtt = self.queue.get_property("min-threshold-time")
     171            if abs(cmtt-mtt)>=lrange//2:
     172                self.queue.set_property("min-threshold-time", mtt*MS_TO_NS)
     173            cmst = self.queue.get_property("max-size-time")
     174            if abs(cmst-mst)>=lrange//2:
     175                self.queue.set_property("max-size-time", mst*MS_TO_NS)
     176        else:
     177            log("check_levels state=%-10s level=%3i, only %i level recorded", new_state, clt, len(filtered))
    148178        self.emit_info()
    149179
    150180
    151181    def queue_underrun(self, *args):
    152         if self.queue_state=="underrun" or self.queue_state=="starting":
     182        if self.queue_state=="starting":
     183            #ignore for now
    153184            return
    154         ltime = self.queue.get_property("current-level-time")/MS_TO_NS
    155         log("queue underrun: level=%i, previous state=%s", ltime, self.queue_state)
    156         self.queue_state = "underrun"
    157         MIN_QUEUE = QUEUE_TIME//2
    158         mts = self.queue.get_property("min-threshold-time")
    159         if mts==MIN_QUEUE:
    160             return 0
    161         #set min time restrictions to fill up queue:
    162         log("increasing the min-threshold-time to %ims", MIN_QUEUE//MS_TO_NS)
    163         self.queue.set_property("min-threshold-time", MIN_QUEUE)
    164         def restore():
    165             if self.queue.get_property("min-threshold-time")==0:
    166                 log("min-threshold-time already restored!")
    167                 return False
    168             ltime = self.queue.get_property("current-level-time")//MS_TO_NS
    169             if ltime==0:
    170                 log("not restored! (still underrun: %ims)", ltime)
    171                 return True
    172             log("resetting the min-threshold-time back to %ims", 0)
    173             self.queue.set_property("min-threshold-time", 0)
    174             self.queue_state = "running"
    175             return False
    176         glib.timeout_add(1000, restore)
    177         return 0
     185        now = time.time()
     186        if self.last_underrun is None or now-self.last_underrun>2:
     187            self.last_underrun = now
     188        self.underruns.append(now)
     189        self.check_levels("underrun")
     190        return True
    178191
    179192    def queue_overrun(self, *args):
    180         if self.queue_state=="overrun":
    181             return
    182         ltime = self.queue.get_property("current-level-time")//MS_TO_NS
    183         log("queue overrun: level=%i, previous state=%s", ltime, self.queue_state)
    184         self.queue_state = "overrun"
    185         REDUCED_QT = QUEUE_TIME//2
    186         #empty the queue by reducing its max size:
    187         log("queue overrun: halving the max-size-time to %ims", REDUCED_QT//MS_TO_NS)
    188         self.queue.set_property("max-size-time", REDUCED_QT)
    189         self.overruns += 1
    190         self.emit("overrun", ltime)
    191         def restore():
    192             if self.queue.get_property("max-size-time")==QUEUE_TIME:
    193                 log("max-size-time already restored!")
    194                 return False
    195             ltime = self.queue.get_property("current-level-time")//MS_TO_NS
    196             if ltime>=REDUCED_QT:
    197                 log("max-size-time not restored! (still overrun: %ims)", ltime)
    198                 return True
    199             log("raising the max-size-time back to %ims", QUEUE_TIME//MS_TO_NS)
    200             self.queue.set_property("max-size-time", QUEUE_TIME)
    201             self.queue_state = "running"
    202             return False
    203         glib.timeout_add(1000, restore)
    204         return 0
     193        log.info("overrun!")
     194        clt = self.queue.get_property("current-level-time")//MS_TO_NS
     195        self.emit("overrun", clt)
     196        now = time.time()
     197        #grace period of recording overruns:
     198        #(because when we record an overrun, we lower the max-time,
     199        # which causes more overruns!)
     200        if self.last_overrun is None or now-self.last_overrun>2:
     201            self.last_overrun = now
     202        self.overruns.append(now)
     203        self.check_levels("overrun")
     204        return True
    205205
    206206    def eos(self):
    207207        log("eos()")
     
    208208        if self.src:
    209209            self.src.emit('end-of-stream')
    210210        self.cleanup()
    211         return 0
     211        return False
    212212
    213213    def get_info(self):
    214214        info = SoundPipeline.get_info(self)
     
    221221                "max"           : qmax//MS_TO_NS,
    222222                "cur"           : clt//MS_TO_NS,
    223223                "pct"           : min(QUEUE_TIME, clt)*100//qmax,
    224                 "overruns"      : self.overruns,
     224                #"overruns"      : self.overruns,
    225225                "state"         : self.queue_state})
    226226        return info
    227227
     
    247247        if self.push_buffer(buf):
    248248            self.buffer_count += 1
    249249            self.byte_count += len(data)
    250             ltime = self.queue.get_property("current-level-time")//MS_TO_NS
    251             log("pushed %s bytes, new buffer level: %sms, queue state=%s", len(data), ltime, self.queue_state)
     250            clt = self.queue.get_property("current-level-time")//MS_TO_NS
     251            log("pushed %i bytes, new buffer level: %ims, queue state=%s", len(data), clt, self.queue_state)
     252            self.levels.append((time.time(), clt))
     253            if self.queue_state=="pushing":
     254                self.check_levels("pushing")
    252255        self.emit_info()
    253256
    254257    def push_buffer(self, buf):
     
    258261        #buf.offset = offset
    259262        #buf.offset_end = offset_end
    260263        #buf.set_caps(gst.caps_from_string(caps))
     264        clt = self.queue.get_property("current-level-time")//MS_TO_NS
     265        log("before pushing, level=%ims", clt)
    261266        r = self.src.emit("push-buffer", buf)
    262267        if r!=gst.FLOW_OK:
    263268            log.error("push-buffer error: %s", r)