xpra icon
Bug tracker and wiki

Ticket #639: udp-v4.patch

File udp-v4.patch, 63.3 KB (added by Antoine Martin, 2 years ago)

kinda working path - with many limitations

  • xpra/client/client_base.py

     
    232232        raise NotImplementedError()
    233233
    234234    def setup_connection(self, conn):
    235         netlog("setup_connection(%s) timeout=%s", conn, conn.timeout)
    236         self._protocol = Protocol(self.get_scheduler(), conn, self.process_packet, self.next_packet)
     235        netlog.info("setup_connection(%s) timeout=%s, socktype=%s", conn, conn.timeout, conn.socktype)
     236        if conn.socktype in ("udp", "dtls"):
     237            from xpra.net.udp_protocol import UDPClientProtocol
     238            self._protocol = UDPClientProtocol(self.get_scheduler(), conn, self.process_packet, self.next_packet)
     239            self.set_packet_handlers(self._packet_handlers, {
     240                "udp-control"   : self._process_udp_control,
     241                })
     242        else:
     243            self._protocol = Protocol(self.get_scheduler(), conn, self.process_packet, self.next_packet)
    237244        self._protocol.large_packets.append("keymap-changed")
    238245        self._protocol.large_packets.append("server-settings")
    239246        self._protocol.large_packets.append("logging")
     
    254261            getChildReaper().add_process(proc, name, command, ignore=True, forget=False)
    255262        netlog("setup_connection(%s) protocol=%s", conn, self._protocol)
    256263
     264    def _process_udp_control(self, packet):
     265        self._protocol.process_control(*packet[1:])
    257266
     267
    258268    def remove_packet_handlers(self, *keys):
    259269        for k in keys:
    260270            for d in (self._packet_handlers, self._ui_packet_handlers):
  • xpra/net/bytestreams.py

     
    9898            return f(*a, **kw)
    9999        except Exception as e:
    100100            retry = can_retry(e)
     101            log("untilConcludes", exc_info=True)
    101102            log("untilConcludes(%s, %s, %s, %s, %s) %s, retry=%s", is_active_cb, can_retry, f, a, kw, e, retry)
    102103            if retry:
    103104                if wait>0:
     
    277278            i = s
    278279        log("%s.close() for socket=%s", self, i)
    279280        Connection.close(self)
    280         s.settimeout(0)
     281        try:
     282            s.settimeout(0)
     283        except:
     284            pass
    281285        #this is more proper but would break the proxy server:
    282286        #s.shutdown(socket.SHUT_RDWR)
    283287        s.close()
     
    306310        s = self._socket
    307311        if not s:
    308312            return None
    309         return {
    310                 #"class"         : str(type(s)),
    311                 "fileno"        : s.fileno(),
    312                 "timeout"       : int(1000*(s.gettimeout() or 0)),
     313        info = {
    313314                "family"        : FAMILY_STR.get(s.family, s.family),
    314315                "proto"         : s.proto,
    315316                "type"          : PROTOCOL_STR.get(s.type, s.type),
    316317                }
     318        try:
     319            info["timeout"] = int(1000*(s.gettimeout() or 0))
     320        except:
     321            pass
     322        if hasattr(s, "fileno"):
     323            info["fileno"] = s.fileno()
     324        return info
    317325
    318 
    319326try:
    320327    #this wrapper class allows us to override the normal ssl.Socket
    321328    #class so that we can fake peek() support by actually reading from the socket
  • xpra/net/protocol.py

     
    5959    packet_encoding_sanity_checks()
    6060
    6161
     62def force_flush_queue(q):
     63    try:
     64        #discard all elements in the old queue and push the None marker:
     65        try:
     66            while q.qsize()>0:
     67                q.read(False)
     68        except:
     69            pass
     70        q.put_nowait(None)
     71    except:
     72        pass
     73
    6274class Protocol(object):
     75    """
     76        This class handles sending and receiving packets,
     77        it will encode and compress them before sending,
     78        and decompress and decode when receiving.
     79    """
     80
    6381    CONNECTION_LOST = "connection-lost"
    6482    GIBBERISH = "gibberish"
    6583    INVALID = "invalid"
     
    7290        assert conn is not None
    7391        self.timeout_add = scheduler.timeout_add
    7492        self.idle_add = scheduler.idle_add
     93        self.source_remove = scheduler.source_remove
    7594        self._conn = conn
    7695        if FAKE_JITTER>0:
    7796            from xpra.net.fake_jitter import FakeJitter
     
    81100            self._process_packet_cb = process_packet_cb
    82101        self._write_queue = Queue(1)
    83102        self._read_queue = Queue(20)
     103        self._process_read = self.read_queue_put
    84104        self._read_queue_put = self.read_queue_put
    85105        # Invariant: if .source is None, then _source_has_more == False
    86106        self._get_packet_cb = get_packet_cb
     
    279299                return
    280300            self._internal_error("error in network packet write/format", e, exc_info=True)
    281301
    282     def _add_packet_to_queue(self, packet, start_send_cb=None, end_send_cb=None, has_more=False):
     302    def _add_packet_to_queue(self, packet, start_send_cb=None, end_send_cb=None, fail_cb=None, has_more=False):
    283303        if has_more:
    284304            self._source_has_more.set()
    285305        if packet is None:
     
    290310            if self._closed:
    291311                return
    292312            try:
    293                 self._add_chunks_to_queue(chunks, start_send_cb, end_send_cb)
     313                self._add_chunks_to_queue(chunks, start_send_cb, end_send_cb, fail_cb)
    294314            except:
    295315                log.error("Error: failed to queue '%s' packet", packet[0])
    296                 log("add_chunks_to_queue%s", (chunks, start_send_cb, end_send_cb), exc_info=True)
     316                log("add_chunks_to_queue%s", (chunks, start_send_cb, end_send_cb, fail_cb), exc_info=True)
    297317                raise
    298318
    299     def _add_chunks_to_queue(self, chunks, start_send_cb=None, end_send_cb=None):
     319    def _add_chunks_to_queue(self, chunks, start_send_cb=None, end_send_cb=None, fail_cb=None, synchronous=True):
    300320        """ the write_lock must be held when calling this function """
    301321        counter = 0
    302322        items = []
    303323        for proto_flags,index,level,data in chunks:
    304             scb, ecb = None, None
    305324            #fire the start_send_callback just before the first packet is processed:
    306             if counter==0:
    307                 scb = start_send_cb
    308325            #fire the end_send callback when the last packet (index==0) makes it out:
    309             if index==0:
    310                 ecb = end_send_cb
    311326            payload_size = len(data)
    312327            actual_size = payload_size
    313328            if self.cipher_out:
     
    328343                assert not self.cipher_out
    329344                #for plain/text packets (ie: gibberish response)
    330345                log("sending %s bytes without header", payload_size)
    331                 items.append((data, scb, ecb))
     346                items.append(data)
    332347            elif actual_size<PACKET_JOIN_SIZE:
    333                 if type(data) not in JOIN_TYPES:
     348                if not isinstance(data, JOIN_TYPES):
    334349                    data = memoryview_to_bytes(data)
    335350                header_and_data = pack_header(proto_flags, level, index, payload_size) + data
    336                 items.append((header_and_data, scb, ecb))
     351                items.append(header_and_data)
    337352            else:
    338353                header = pack_header(proto_flags, level, index, payload_size)
    339                 items.append((header, scb, None))
    340                 items.append((data, None, ecb))
     354                items.append(header)
     355                items.append(data)
    341356            counter += 1
    342357        if self._write_thread is None:
    343358            self.start_write_thread()
    344         self._write_queue.put(items)
    345         self.output_packetcount += 1
     359        self._write_queue.put((items, start_send_cb, end_send_cb, fail_cb, synchronous))
    346360
     361
    347362    def start_write_thread(self):
    348363        self._write_thread = start_thread(self._write_thread_loop, "write", daemon=True)
    349364
    350     def raw_write(self, contents, start_cb=None, end_cb=None):
     365    def raw_write(self, contents, start_cb=None, end_cb=None, fail_cb=None, synchronous=True):
    351366        """ Warning: this bypasses the compression and packet encoder! """
    352367        if self._write_thread is None:
    353368            self.start_write_thread()
    354         self._write_queue.put(((contents, start_cb, end_cb), ))
     369        self._write_queue.put((contents, start_cb, end_cb, fail_cb, synchronous))
    355370
    356371    def verify_packet(self, packet):
    357372        """ look for None values which may have caused the packet to fail encoding """
     
    539554        assert level>=0 and level<=10, "invalid compression level: %s (must be between 0 and 10" % level
    540555        self.compression_level = level
    541556
     557
    542558    def _io_thread_loop(self, name, callback):
    543559        try:
    544560            log("io_thread_loop(%s, %s) loop starting", name, callback)
     
    559575                log.error("Error: %s on %s failed: %s", name, self._conn, type(e), exc_info=True)
    560576                self.close()
    561577
     578
    562579    def _write_thread_loop(self):
    563580        self._io_thread_loop("write", self._write)
    564581    def _write(self):
     
    568585            log("write thread: empty marker, exiting")
    569586            self.close()
    570587            return False
    571         for buf, start_cb, end_cb in items:
    572             con = self._conn
    573             if not con:
    574                 return False
    575             if start_cb:
    576                 try:
    577                     start_cb(con.output_bytecount)
    578                 except:
    579                     if not self._closed:
    580                         log.error("Error on write start callback %s", start_cb, exc_info=True)
     588        return self.write_items(*items)
     589   
     590    def write_items(self, buf_data, start_cb=None, end_cb=None, fail_cb=None, synchronous=True):
     591        con = self._conn
     592        if not con:
     593            return False
     594        if start_cb:
     595            try:
     596                start_cb(con.output_bytecount)
     597            except:
     598                if not self._closed:
     599                    log.error("Error on write start callback %s", start_cb, exc_info=True)
     600        self.write_buffers(buf_data, fail_cb, synchronous)
     601        if end_cb:
     602            try:
     603                end_cb(self._conn.output_bytecount)
     604            except:
     605                if not self._closed:
     606                    log.error("Error on write end callback %s", end_cb, exc_info=True)
     607        return True
     608
     609    def write_buffers(self, buf_data, _fail_cb, _synchronous):
     610        con = self._conn
     611        if not con:
     612            return 0
     613        for buf in buf_data:
    581614            while buf and not self._closed:
    582615                written = con.write(buf)
    583616                #example test code, for sending small chunks very slowly:
     
    587620                if written:
    588621                    buf = buf[written:]
    589622                    self.output_raw_packetcount += 1
    590             if end_cb:
    591                 try:
    592                     end_cb(self._conn.output_bytecount)
    593                 except:
    594                     if not self._closed:
    595                         log.error("Error on write end callback %s", end_cb, exc_info=True)
    596         return True
     623        self.output_packetcount += 1
    597624
     625
    598626    def _read_thread_loop(self):
    599627        self._io_thread_loop("read", self._read)
    600628    def _read(self):
     
    601629        buf = self._conn.read(READ_BUFFER_SIZE)
    602630        #log("read thread: got data of size %s: %s", len(buf), repr_ellipsized(buf))
    603631        #add to the read queue (or whatever takes its place - see steal_connection)
    604         self._read_queue_put(buf)
     632        self._process_read(buf)
    605633        if not buf:
    606634            log("read thread: eof")
    607635            #give time to the parse thread to call close itself
     
    646674    def _invalid_header(self, data, msg=""):
    647675        self.invalid_header(self, data, msg)
    648676
    649     def invalid_header(self, proto, data, msg="invalid packet header"):
     677    def invalid_header(self, _proto, data, msg="invalid packet header"):
    650678        err = "%s: '%s'" % (msg, binascii.hexlify(data[:8]))
    651679        if len(data)>1:
    652680            err += " read buffer=%s (%i bytes)" % (repr_ellipsized(data), len(data))
     
    653681        self.gibberish(err, data)
    654682
    655683
     684    def process_read(self, data):
     685        self._read_queue_put(data)
     686
    656687    def read_queue_put(self, data):
    657688        #start the parse thread if needed:
    658689        if not self._read_parser_thread and not self._closed:
     
    660691                log("empty marker in read queue, exiting")
    661692                self.idle_add(self.close)
    662693                return
    663             self._read_parser_thread = make_thread(self._read_parse_thread_loop, "parse", daemon=True)
    664             self._read_parser_thread.start()
     694            self.start_read_parser_thread()
    665695        self._read_queue.put(data)
    666696        #from now on, take shortcut:
    667697        if self._read_queue_put==self.read_queue_put:
    668698            self._read_queue_put = self._read_queue.put
    669699
     700    def start_read_parser_thread(self):
     701        self._read_parser_thread = start_thread(self._read_parse_thread_loop, "parse", daemon=True)
     702
    670703    def _read_parse_thread_loop(self):
    671704        log("read_parse_thread_loop starting")
    672705        try:
     
    911944                        close_and_release()
    912945                        return False
    913946                    return not self._closed     #run until we manage to close (here or via the timeout)
    914                 def packet_queued(*args):
     947                def packet_queued(*_args):
    915948                    #if we're here, we have the lock and the packet is in the write queue
    916949                    log("flush_then_close: packet_queued() closed=%s", self._closed)
    917950                    if wait_for_packet_sent():
     
    952985        self.idle_add(self._process_packet_cb, self, [Protocol.CONNECTION_LOST])
    953986        c = self._conn
    954987        if c:
     988            self._conn = None
    955989            try:
    956990                log("Protocol.close() calling %s", c.close)
    957991                c.close()
    958                 if self._log_stats is None and self._conn.input_bytecount==0 and self._conn.output_bytecount==0:
     992                if self._log_stats is None and c.input_bytecount==0 and c.output_bytecount==0:
    959993                    #no data sent or received, skip logging of stats:
    960994                    self._log_stats = False
    961995                if self._log_stats:
    962996                    from xpra.simple_stats import std_unit, std_unit_dec
    963997                    log.info("connection closed after %s packets received (%s bytes) and %s packets sent (%s bytes)",
    964                          std_unit(self.input_packetcount), std_unit_dec(self._conn.input_bytecount),
    965                          std_unit(self.output_packetcount), std_unit_dec(self._conn.output_bytecount)
     998                         std_unit(self.input_packetcount), std_unit_dec(c.input_bytecount),
     999                         std_unit(self.output_packetcount), std_unit_dec(c.output_bytecount)
    9661000                         )
    9671001            except:
    968                 log.error("error closing %s", self._conn, exc_info=True)
    969             self._conn = None
     1002                log.error("error closing %s", c, exc_info=True)
    9701003        self.terminate_queue_threads()
    9711004        self.idle_add(self.clean)
    9721005        log("Protocol.close() done")
     
    10101043        exit_queue = Queue()
    10111044        for _ in range(10):     #just 2 should be enough!
    10121045            exit_queue.put(None)
    1013         try:
    1014             owq = self._write_queue
    1015             self._write_queue = exit_queue
    1016             #discard all elements in the old queue and push the None marker:
    1017             try:
    1018                 while owq.qsize()>0:
    1019                     owq.read(False)
    1020             except:
    1021                 pass
    1022             owq.put_nowait(None)
    1023         except:
    1024             pass
    1025         try:
    1026             orq = self._read_queue
    1027             self._read_queue = exit_queue
    1028             #discard all elements in the old queue and push the None marker:
    1029             try:
    1030                 while orq.qsize()>0:
    1031                     orq.read(False)
    1032             except:
    1033                 pass
    1034             orq.put_nowait(None)
    1035         except:
    1036             pass
     1046        #write queue:
     1047        owq = self._write_queue
     1048        self._write_queue = exit_queue
     1049        force_flush_queue(owq)
     1050        #read queue:
     1051        orq = self._read_queue
     1052        self._read_queue = exit_queue
     1053        force_flush_queue(orq)
    10371054        #just in case the read thread is waiting again:
    10381055        self._source_has_more.set()
  • xpra/net/udp_protocol.py

     
     1# This file is part of Xpra.
     2# Copyright (C) 2017 Antoine Martin <antoine@devloop.org.uk>
     3# Xpra is released under the terms of the GNU GPL v2, or, at your option, any
     4# later version. See the file COPYING for details.
     5
     6import socket
     7import struct
     8import random
     9try:
     10    import errno
     11    EMSGSIZE = errno.EMSGSIZE
     12except ImportError as e:
     13    EMSGSIZE = None
     14
     15from xpra.log import Logger
     16log = Logger("network", "protocol")
     17
     18from xpra.os_util import LINUX, monotonic_time
     19from xpra.util import envint, repr_ellipsized
     20from xpra.make_thread import start_thread
     21from xpra.net.protocol import Protocol
     22from xpra.net.bytestreams import SocketConnection
     23
     24READ_BUFFER_SIZE = envint("XPRA_READ_BUFFER_SIZE", 65536)
     25DROP_PCT = envint("XPRA_UDP_DROP_PCT", 0)
     26
     27
     28#UUID, seqno, chunk, chunks
     29_header_struct = struct.Struct('!QQHHH')
     30_header_size = _header_struct.size
     31
     32
     33class IncompletePacket(object):
     34    def __init__(self, seqno, start_time, chunks=None):
     35        self.seqno = seqno
     36        self.start_time = start_time
     37        self.last_time = start_time
     38        #todo: use numpy array of bytes
     39        self.chunks = chunks
     40    def __repr__(self):
     41        return ("IncompletePacket(%i: %s chunks)" % (self.seqno, len(self.chunks or [])))
     42
     43
     44class UDPListener(object):
     45    """
     46        This class is used by servers to receive UDP packets,
     47        it parses the header and then exposes the data received via process_packet_cb.
     48    """
     49
     50    def __init__(self, sock, process_packet_cb):
     51        assert sock is not None
     52        self._closed = False
     53        self._socket = sock
     54        self._process_packet_cb =  process_packet_cb
     55        self._read_thread = start_thread(self._read_thread_loop, "read", daemon=True)
     56
     57    def __repr__(self):
     58        return "UDPListener(%s)" % self._socket
     59
     60    def _read_thread_loop(self):
     61        log.info("udp read thread loop starting")
     62        try:
     63            while not self._closed:
     64                buf, bfrom = self._socket.recvfrom(READ_BUFFER_SIZE)
     65                if not buf:
     66                    log("read thread: eof")
     67                    break
     68                values = list(_header_struct.unpack_from(buf[:_header_size])) + [buf[_header_size:], bfrom]
     69                try:
     70                    self._process_packet_cb(self, *values)
     71                except Exception as e:
     72                    log("_read_thread_loop() buffer=%s, from=%s", repr_ellipsized(buf), bfrom, exc_info=True)
     73                    if not self._closed:
     74                        log.error("Error: UDP packet processing error:")
     75                        log.error(" %s", e)
     76        except Exception as e:
     77            #can happen during close(), in which case we just ignore:
     78            if not self._closed:
     79                log.error("Error: read on %s failed: %s", self._socket, type(e), exc_info=True)
     80        log("udp read thread loop ended")
     81        self.close()
     82
     83    def close(self):
     84        s = self._socket
     85        log("UDPListener.close() closed=%s, socket=%s", self._closed, s)
     86        if self._closed:
     87            return
     88        self._closed = True
     89        if s:
     90            try:
     91                log("Protocol.close() calling %s", s.close)
     92                s.close()
     93            except:
     94                log.error("error closing %s", s, exc_info=True)
     95            self._socket = None
     96        log("UDPListener.close() done")
     97
     98
     99class UDPProtocol(Protocol):
     100    """
     101        This class extends the Protocol class with UDP encapsulation.
     102        A single packet may end up being fragmented into multiple UDP frames
     103        to fit in the MTU.
     104        We keep track of the function which can be used to handle send failures
     105        (or the packet data if no function is supplied).
     106    """
     107
     108    def __init__(self, *args):
     109        Protocol.__init__(self, *args)
     110        self.mtu = 0
     111        self.last_sequence = -1     #the most recent packet sequence we processed in full
     112        self.highest_sequence = -1
     113        self.fail_cb = {}
     114        self.incomplete_packets = {}
     115        self.can_skip = set()
     116        self.cancel = set()
     117        self.control_timer = self.timeout_add(2000, self.send_control)
     118        self._process_read = self.process_read
     119
     120    def close(self):
     121        Protocol.close(self)
     122        ct = self.control_timer
     123        if ct:
     124            self.control_timer = None
     125            self.source_remove(ct)
     126
     127    def send_control(self):
     128        if self._closed:
     129            self.control_timer = None
     130            return False
     131        missing = {}
     132        if self.incomplete_packets:
     133            now = monotonic_time()
     134            max_start_time = now-0.020
     135            late_start_time = now-2
     136            not_recent = now-0.5
     137            for seqno, ip in self.incomplete_packets.items():
     138                st = ip.start_time
     139                if st>=max_start_time:
     140                    continue        #too recent, may still arrive
     141                if st<late_start_time or ip.last_time<not_recent:
     142                    if ip.chunks is None:
     143                        missing[seqno] = []
     144                    else:
     145                        #TODO: use bitmap instead?
     146                        missing_chunks = [i for i,x in enumerate(ip.chunks) if x is None]
     147                        if missing_chunks:
     148                            missing[seqno] = missing_chunks
     149        packet = ("udp-control", self.mtu, self.last_sequence, self.highest_sequence, missing, tuple(self.cancel))
     150        log.info("send_control() packet(%s)=%s", self.incomplete_packets, packet)
     151        self.send_async(packet)
     152        self.cancel = set()
     153        return True
     154
     155    def send_async(self, packet):
     156        chunks = self.encode(packet)
     157        if len(chunks)>1:
     158            return Protocol.send_now(packet)
     159        proto_flags,index,level,data = chunks[0]
     160        from xpra.net.header import pack_header
     161        payload_size = len(data)
     162        header_and_data = pack_header(proto_flags, level, index, payload_size) + data
     163        with self._write_lock:
     164            if self._write_thread is None:
     165                self.start_write_thread()
     166            self._write_queue.put((header_and_data, None, None, None, False))
     167
     168    def process_udp_data(self, seqno, synchronous, chunk, chunks, data, bfrom):
     169        log.info("process_udp_data%s %i bytes", (seqno, synchronous, chunk, chunks, repr_ellipsized(data), bfrom), len(data))
     170        if DROP_PCT>0:
     171            if random.randint(0, 100) <= DROP_PCT:
     172                log.warn("Warning: dropping udp packet %i.%i", seqno, chunk)
     173                return
     174        if seqno<=self.last_sequence:
     175            #must be a duplicate, we've already processed it!
     176            return
     177        self.highest_sequence = max(self.highest_sequence, seqno)
     178        if self.incomplete_packets or (synchronous and seqno!=self.last_sequence+1) or chunk!=0 or chunks!=1:
     179            assert chunk>=0 and chunks>0 and chunk<chunks, "invalid chunk: %i/%i" % (chunk, chunks)
     180            #slow path: add chunk to incomplete packet
     181            now = monotonic_time()
     182            ip = self.incomplete_packets.get(seqno)
     183            if not ip or not ip.chunks:
     184                chunks_array = [None for _ in range(chunks)]
     185                ip = IncompletePacket(seqno, now, chunks_array)
     186                self.incomplete_packets[seqno] = ip
     187            else:
     188                ip.last_time = now
     189            ip.chunks[chunk] = data
     190            if seqno!=self.last_sequence+1:
     191                #we're waiting for a packet and this is not it,
     192                #make sure any gaps are marked as incomplete:
     193                for i in range(self.last_sequence+1, seqno):
     194                    if i not in self.incomplete_packets and i not in self.can_skip:
     195                        self.incomplete_packets[i] = IncompletePacket(i, now)
     196                if synchronous:
     197                    #so we're done here
     198                    log("process_udp_data: we're waiting for %i, not %i", self.last_sequence+1, seqno)
     199                    return
     200            if any(x is None for x in ip.chunks):
     201                #one of the chunks is still missing
     202                log("process_udp_data: sequence %i is still missing some chunks: %s", seqno, [i for i,x in enumerate(ip.chunks) if x is None])
     203                return
     204            #all the data is here!
     205            del self.incomplete_packets[seqno]
     206            data = b"".join(ip.chunks)
     207        log.info("process_udp_data: adding packet sequence %i to read queue", seqno)
     208        if seqno==self.last_sequence+1:
     209            self.last_sequence = seqno
     210        else:
     211            assert not synchronous
     212            self.can_skip.add(seqno)
     213        self._read_queue_put(data)
     214        if self.incomplete_packets:
     215            self.process_incomplete()
     216
     217    def process_incomplete(self):
     218        #maybe we can send the next one(s) now?
     219        seqno = self.last_sequence
     220        log("process_incomplete() last_sequence=%i, can skip=%s", seqno, self.can_skip)
     221        while True:
     222            seqno += 1
     223            if seqno in self.can_skip:
     224                try:
     225                    del self.incomplete_packets[seqno]
     226                except KeyError:
     227                    pass
     228                self.can_skip.remove(seqno)
     229                self.last_sequence = seqno
     230                continue
     231            ip = self.incomplete_packets.get(seqno)
     232            if not ip or not ip.chunks:
     233                #it's missing, we just don't know how many chunks
     234                return
     235            if any(x is None for x in ip.chunks):
     236                #one of the chunks is still missing
     237                return
     238            #all the data is here!
     239            del self.incomplete_packets[seqno]
     240            data = b"".join(ip.chunks)
     241            log.info("process_udp_data: adding packet sequence %i to read queue", seqno)
     242            self.last_sequence = seqno
     243            self._read_queue_put(data)
     244               
     245
     246    def write_buffers(self, buf_data, fail_cb, synchronous):
     247        buf = b"".join(buf_data)
     248        #if not isinstance(buf, JOIN_TYPES):
     249        #    buf = memoryview_to_bytes(buf)
     250        while True:
     251            try:
     252                seqno = self.output_packetcount
     253                return self.do_con_write(seqno, buf, fail_cb, synchronous)
     254            except MTUExceeded as e:
     255                log.warn("%s: %s", e, self.mtu)
     256                if self.mtu>576:
     257                    self.mtu //= 2
     258                raise
     259
     260    def do_con_write(self, seqno, data, fail_cb, synchronous):
     261        con = self._conn
     262        if not con:
     263            return 0
     264        #TODO: bump to 1280 for IPv6
     265        #mtu = max(576, self.mtu)
     266        mtu = max(1280, self.mtu)
     267        uuid = 0 #todo!
     268        l = len(data)
     269        maxpayload = mtu-_header_size
     270        chunks = l // maxpayload
     271        if l % maxpayload > 0:
     272            chunks += 1
     273        log.info("UDP.con_write(%s, %i bytes, %s, %s) seq=%i, mtu=%s, maxpayload=%i, chunks=%i, data=%s", con, l, fail_cb, synchronous, seqno, mtu, maxpayload, chunks, repr_ellipsized(data))
     274        chunk = 0
     275        offset = 0
     276        chunk_resend_cache = None
     277        if fail_cb:
     278            self.fail_cb[seqno] = fail_cb
     279        else:
     280            chunk_resend_cache = {}
     281            self.fail_cb[seqno] = chunk_resend_cache
     282        while offset<l:
     283            assert chunk<chunks
     284            pl = min(maxpayload, l-offset)
     285            data_chunk = data[offset:offset+pl]
     286            udp_data = _header_struct.pack(uuid, seqno, synchronous, chunk, chunks) + data_chunk
     287            assert len(udp_data)<=mtu, "invalid payload size: %i greater than mtu %i" % (len(udp_data), mtu)
     288            con.write(udp_data)
     289            self.output_raw_packetcount += 1
     290            offset += pl
     291            if chunk_resend_cache is not None:
     292                chunk_resend_cache[chunk] = udp_data
     293            chunk += 1
     294        assert chunk==chunks, "wrote %i chunks but expected %i" % (chunk, chunks)
     295        self.output_packetcount += 1
     296        return offset
     297
     298    def process_control(self, mtu, last_seq, high_seq, missing, cancel):
     299        log.warn("process_control(%i, %i, %i, %s, %s)", mtu, last_seq, high_seq, missing, cancel)
     300        con = self._conn
     301        if not con:
     302            return
     303        if mtu and self.mtu==0:
     304            self.mtu = mtu
     305        #first, we can free all the packets that aren't missing:
     306        if last_seq>=0:
     307            done = [x for x in self.fail_cb.keys() if x<=last_seq]
     308            for x in done:
     309                try:
     310                    del self.fail_cb[x]
     311                except:
     312                    pass
     313        #next we can forget about sequence numbers that have been cancelled:
     314        if cancel:
     315            for seqno in cancel:
     316                if seqno>self.last_sequence:
     317                    self.can_skip.add(seqno)
     318                try:
     319                    del self.incomplete_packets[seqno]
     320                except:
     321                    pass
     322            if self.incomplete_packets or self.can_skip:
     323                self.process_incomplete()
     324        #re-send the missing ones:
     325        for seqno, missing_chunks in missing.items():
     326            fail_cb_seq = self.fail_cb.get(seqno)
     327            if fail_cb_seq is None:
     328                log.error("Error: cannot resend packet sequence %i", seqno)
     329                #hope for the best, and tell the other end to stop asking:
     330                self.cancel.add(seqno)
     331                continue
     332            log("fail_cb[%i]=%s", seqno, repr_ellipsized(str(fail_cb_seq)))
     333            if callable(fail_cb_seq):
     334                self.cancel.add(seqno)
     335                fail_cb_seq()
     336                continue
     337            if len(missing_chunks)==0:
     338                #the other end only knows it is missing the seqno,
     339                #not how many chunks are missing, so send them all
     340                missing_chunks = fail_cb_seq.keys()
     341            for c in missing_chunks:
     342                data = fail_cb_seq.get(c)
     343                log("resend data[%i][%i]=%s", seqno, c, repr_ellipsized(str(data)))
     344                if data is None:
     345                    log.error("Error: cannot resend chunk %i of packet sequence %i", c, seqno)
     346                    log.error(" data missing from packet resend cache")
     347                    continue
     348                #send it again:
     349                con.write(data)
     350
     351
     352    def get_info(self, alias_info=True):
     353        i = Protocol.get_info(self, alias_info)
     354        i["mtu"] = self.mtu
     355        return i
     356
     357
     358class UDPServerProtocol(UDPProtocol):
     359
     360    def _read_thread_loop(self):
     361        #server protocol is not used to read,
     362        #we rely on the listener to dispatch packets instead
     363        pass
     364
     365class UDPClientProtocol(UDPProtocol):
     366
     367    def con_write(self, data, fail_cb):
     368        """ After successfully writing some data, update the mtu value """
     369        r = UDPProtocol.con_write(self, data, fail_cb)
     370        if r>0 and LINUX:
     371            IP_MTU = 14
     372            con = self._conn
     373            if con:
     374                try:
     375                    self.mtu = min(32767, con._socket.getsockopt(socket.IPPROTO_IP, IP_MTU))
     376                    #log("mtu=%s", self.mtu)
     377                except IOError as e:
     378                    pass
     379        return r
     380
     381    def process_read(self, buf):
     382        """ Splits and parses the UDP frame header from the packet """
     383        #log.info("UDPClientProtocol.read_queue_put(%s)", repr_ellipsized(buf))
     384        _, seqno, synchronous, chunk, chunks = _header_struct.unpack_from(buf[:_header_size])
     385        data = buf[_header_size:]
     386        bfrom = None        #not available here..
     387        self.process_udp_data(seqno, synchronous, chunk, chunks, data, bfrom)
     388
     389
     390class UDPSocketConnection(SocketConnection):
     391    """
     392        This class extends SocketConnection to use socket.sendto
     393        to send data to the correct destination.
     394        (servers use a single socket to talk to multiple clients,
     395        they do not call connect() and so we have to specify the remote target every time)
     396    """
     397
     398    def __init__(self, *args):
     399        SocketConnection.__init__(self, *args)
     400
     401    def write(self, buf):
     402        #log("UDPSocketConnection: sending %i bytes to %s", len(buf), self.remote)
     403        try:
     404            return self._socket.sendto(buf, self.remote)
     405        except IOError as e:
     406            if e.errno==EMSGSIZE:
     407                raise MTUExceeded("invalid UDP payload size, cannot send %i bytes: %s" % (len(buf), e))
     408            raise
     409
     410    def close(self):
     411        """
     412            don't close the socket, we're don't own it
     413        """
     414        pass
     415
     416class MTUExceeded(IOError):
     417    pass
  • xpra/scripts/config.py

     
    437437                    "auth"              : str,
    438438                    "vsock-auth"        : str,
    439439                    "tcp-auth"          : str,
     440                    "udp-auth"          : str,
     441                    "dtls-auth"         : str,
    440442                    "ws-auth"           : str,
    441443                    "wss-auth"          : str,
    442444                    "ssl-auth"          : str,
     
    594596                    "bind"              : list,
    595597                    "bind-vsock"        : list,
    596598                    "bind-tcp"          : list,
     599                    "bind-udp"          : list,
     600                    "bind-dtls"         : list,
    597601                    "bind-ws"           : list,
    598602                    "bind-wss"          : list,
    599603                    "bind-ssl"          : list,
     
    615619    "start-after-connect", "start-child-after-connect",
    616620    "start-on-connect", "start-child-on-connect",
    617621    ]
    618 BIND_OPTIONS = ["bind", "bind-tcp", "bind-ssl", "bind-ws", "bind-wss", "bind-vsock"]
     622BIND_OPTIONS = ["bind", "bind-tcp", "bind-udp", "bind-ssl", "bind-ws", "bind-wss", "bind-vsock"]
    619623
    620624#keep track of the options added since v1,
    621625#so we can generate command lines that work with older supported versions:
     
    679683    "av-sync", "global-menus",
    680684    "printing", "file-transfer", "open-command", "open-files", "start-new-commands",
    681685    "mmap", "mmap-group", "mdns",
    682     "auth", "vsock-auth", "tcp-auth", "ws-auth", "wss-auth", "ssl-auth", "rfb-auth",
    683     "bind", "bind-vsock", "bind-tcp", "bind-ssl", "bind-ws", "bind-wss", "bind-rfb",
     686    "auth", "vsock-auth", "tcp-auth", "udp-auth", "dtls-auth", "ws-auth", "wss-auth", "ssl-auth", "rfb-auth",
     687    "bind", "bind-vsock", "bind-tcp", "bind-udp", "bind-dtls", "bind-ssl", "bind-ws", "bind-wss", "bind-rfb",
    684688    "start", "start-child",
    685689    "start-after-connect", "start-child-after-connect",
    686690    "start-on-connect", "start-child-on-connect",
     
    799803                    "auth"              : "",
    800804                    "vsock-auth"        : "",
    801805                    "tcp-auth"          : "",
     806                    "udp-auth"          : "",
     807                    "dtls-auth"         : "",
    802808                    "ws-auth"           : "",
    803809                    "wss-auth"          : "",
    804810                    "ssl-auth"          : "",
     
    946952                    "bind"              : bind_dirs,
    947953                    "bind-vsock"        : [],
    948954                    "bind-tcp"          : [],
     955                    "bind-udp"          : [],
     956                    "bind-dtls"         : [],
    949957                    "bind-ws"           : [],
    950958                    "bind-wss"          : [],
    951959                    "bind-ssl"          : [],
  • xpra/scripts/main.py

     
    534534                          metavar="[HOST]:[PORT]",
    535535                          help="Listen for connections over TCP (use --tcp-auth to secure it)."
    536536                            + " You may specify this option multiple times with different host and port combinations")
     537        group.add_option("--bind-udp", action="append",
     538                          dest="bind_udp", default=list(defaults.bind_udp or []),
     539                          metavar="[HOST]:[PORT]",
     540                          help="Listen for connections over UDP (use --udp-auth to secure it)."
     541                            + " You may specify this option multiple times with different host and port combinations")
     542        group.add_option("--bind-dtls", action="append",
     543                          dest="bind_dtls", default=list(defaults.bind_dtls or []),
     544                          metavar="[HOST]:[PORT]",
     545                          help="Listen for connections over UDP + DTLS (use --dtls-auth to secure it)."
     546                            + " You may specify this option multiple times with different host and port combinations")
    537547        group.add_option("--bind-ws", action="append",
    538548                          dest="bind_ws", default=list(defaults.bind_ws or []),
    539549                          metavar="[HOST]:[PORT]",
     
    558568        ignore({
    559569            "bind"      : defaults.bind,
    560570            "bind-tcp"  : defaults.bind_tcp,
     571            "bind-udp"  : defaults.bind_udp,
    561572            "bind-ws"   : defaults.bind_ws,
    562573            "bind-wss"  : defaults.bind_wss,
    563574            "bind-ssl"  : defaults.bind_ssl,
     
    974985    group.add_option("--tcp-auth", action="store",
    975986                      dest="tcp_auth", default=defaults.tcp_auth,
    976987                      help="The authentication module to use for TCP sockets (default: '%default')")
     988    group.add_option("--udp-auth", action="store",
     989                      dest="udp_auth", default=defaults.udp_auth,
     990                      help="The authentication module to use for UDP sockets (default: '%default')")
    977991    group.add_option("--ws-auth", action="store",
    978992                      dest="ws_auth", default=defaults.ws_auth,
    979993                      help="The authentication module to use for Websockets (default: '%default')")
     
    16861700        if opts.socket_dir:
    16871701            desc["socket_dir"] = opts.socket_dir
    16881702        return desc
    1689     elif display_name.startswith("tcp:") or display_name.startswith("tcp/") or \
    1690             display_name.startswith("ssl:") or display_name.startswith("ssl/"):
    1691         ctype = display_name[:3]        #ie: "ssl" or "tcp"
    1692         separator = display_name[3]     # ":" or "/"
     1703    elif (
     1704        display_name.startswith("tcp:") or display_name.startswith("tcp/") or \
     1705        display_name.startswith("ssl:") or display_name.startswith("ssl/") or \
     1706        display_name.startswith("udp:") or display_name.startswith("udp/") or \
     1707        display_name.startswith("dtls:") or display_name.startswith("dtls/")
     1708        ):
     1709        ctype = display_name[:4].rstrip(":/")   #ie: "ssl" or "tcp"
     1710        separator = display_name[len(ctype)]     # ":" or "/"
    16931711        desc.update({
    16941712                     "type"     : ctype,
    16951713                     })
     
    20732091        from xpra.net.bytestreams import SocketConnection
    20742092        return SocketConnection(sock, "local", "host", (CID_TYPES.get(cid, cid), iport), dtype)
    20752093
    2076     elif dtype in ("tcp", "ssl", "ws", "wss"):
     2094    elif dtype in ("tcp", "ssl", "ws", "wss", "udp", "dtls"):
    20772095        if display_desc.get("ipv6"):
    20782096            assert socket.has_ipv6, "no IPv6 support"
    20792097            family = socket.AF_INET6
     
    20892107                socket.AF_INET  : "IPv4",
    20902108                }.get(family, family), (host, port), e))
    20912109        sockaddr = addrinfo[0][-1]
    2092         sock = socket.socket(family, socket.SOCK_STREAM)
    2093         sock.settimeout(SOCKET_TIMEOUT)
    2094         sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, TCP_NODELAY)
     2110        if dtype in ("udp", "dtls"):
     2111            sock = socket.socket(family, socket.SOCK_DGRAM)
     2112        else:
     2113            sock = socket.socket(family, socket.SOCK_STREAM)
     2114            sock.settimeout(SOCKET_TIMEOUT)
     2115            sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, TCP_NODELAY)
    20952116        strict_host_check = display_desc.get("strict-host-check")
    20962117        if strict_host_check is False:
    20972118            opts.ssl_server_verify_mode = "none"
    20982119        conn = _socket_connect(sock, sockaddr, display_name, dtype)
    2099         if dtype in ("ssl", "wss"):
     2120        if dtype in ("ssl", "wss", "dtls"):
     2121            if dtype=="dtls":
     2122                from dtls import do_patch   #@UnresolvedImport
     2123                do_patch()
    21002124            wrap_socket = ssl_wrap_socket_fn(opts, server_side=False)
    21012125            sock = wrap_socket(sock)
    21022126            assert sock, "failed to wrap socket %s" % sock
     
    22712295        raise InitException("cannot check hostname with verify mode %s" % verify_mode)
    22722296    wrap_socket = context.wrap_socket
    22732297    del opts
    2274     def do_wrap_socket(tcp_socket):
     2298    def do_wrap_socket(tcp_socket, handshake=None):
    22752299        from xpra.log import Logger
    22762300        try:
    22772301            ssl_sock = wrap_socket(tcp_socket, **kwargs)
     
    22812305            if SSLEOFError and isinstance(e, SSLEOFError):
    22822306                return None
    22832307            raise InitExit(EXIT_SSL_FAILURE, "Cannot wrap socket %s: %s" (tcp_socket, e))
    2284         if not server_side:
     2308        if not server_side and handshake is not False:
    22852309            try:
    22862310                ssl_sock.do_handshake(True)
    22872311            except Exception as e:
  • xpra/scripts/server.py

     
    357357    if opts.encoding=="help" or "help" in opts.encodings:
    358358        return show_encoding_help(opts)
    359359
    360     from xpra.server.socket_util import parse_bind_tcp, parse_bind_vsock
    361     bind_tcp = parse_bind_tcp(opts.bind_tcp)
    362     bind_ssl = parse_bind_tcp(opts.bind_ssl)
    363     bind_ws = parse_bind_tcp(opts.bind_ws)
    364     bind_wss = parse_bind_tcp(opts.bind_wss)
    365     bind_rfb = parse_bind_tcp(opts.bind_rfb)
     360    from xpra.server.socket_util import parse_bind_ip, parse_bind_vsock
     361    bind_tcp = parse_bind_ip(opts.bind_tcp)
     362    bind_udp = parse_bind_ip(opts.bind_udp)
     363    bind_dtls= parse_bind_ip(opts.bind_dtls)
     364    bind_ssl = parse_bind_ip(opts.bind_ssl)
     365    bind_ws  = parse_bind_ip(opts.bind_ws)
     366    bind_wss = parse_bind_ip(opts.bind_wss)
     367    bind_rfb = parse_bind_ip(opts.bind_rfb)
    366368    bind_vsock = parse_bind_vsock(opts.bind_vsock)
    367369
    368370    assert mode in ("start", "start-desktop", "upgrade", "shadow", "proxy")
     
    567569    sockets = []
    568570
    569571    #SSL sockets:
    570     wrap_socket_fn = None
     572    wrap_server_socket_fn = None
     573    wrap_client_socket_fn = None
    571574    need_ssl = False
    572575    ssl_opt = opts.ssl.lower()
    573576    if ssl_opt in TRUE_OPTIONS or bind_ssl or bind_wss:
     
    582585    if need_ssl:
    583586        from xpra.scripts.main import ssl_wrap_socket_fn
    584587        try:
    585             wrap_socket_fn = ssl_wrap_socket_fn(opts, server_side=True)
    586             netlog("wrap_socket_fn=%s", wrap_socket_fn)
     588            wrap_server_socket_fn = ssl_wrap_socket_fn(opts, server_side=True)
     589            wrap_client_socket_fn = ssl_wrap_socket_fn(opts, server_side=False)
     590            netlog("wrap socket functions: %s, %s", wrap_server_socket_fn, wrap_client_socket_fn)
    587591        except Exception as e:
    588592            netlog("SSL error", exc_info=True)
    589593            cpaths = csv("'%s'" % x for x in (opts.ssl_cert, opts.ssl_key) if x)
    590594            raise InitException("cannot create SSL socket, check your certificate paths (%s): %s" % (cpaths, e))
    591595
     596    from xpra.server.socket_util import setup_tcp_socket, setup_udp_socket, setup_vsock_socket, setup_local_sockets
    592597    def add_mdns(socktype, host, port):
    593598        recs = mdns_recs.setdefault(socktype.lower(), [])
    594599        rec = (host, port)
     
    598603        socket = setup_tcp_socket(host, iport, socktype)
    599604        sockets.append(socket)
    600605        add_mdns(socktype, host, iport)
    601 
     606    def add_udp_socket(socktype, host, iport):
     607        socket = setup_udp_socket(host, iport, socktype)
     608        if socktype=="dtls":
     609            from dtls import do_patch   #@UnresolvedImport
     610            do_patch()
     611        sockets.append(socket)
     612        add_mdns(socktype, host, iport)
    602613    # Initialize the TCP sockets before the display,
    603614    # That way, errors won't make us kill the Xvfb
    604615    # (which may not be ours to kill at that point)
    605     from xpra.server.socket_util import setup_tcp_socket, setup_vsock_socket, setup_local_sockets
    606616    netlog("setting up SSL sockets: %s", bind_ssl)
    607617    for host, iport in bind_ssl:
    608618        add_tcp_socket("SSL", host, iport)
     
    615625        add_tcp_socket("tcp", host, iport)
    616626        if tcp_ssl:
    617627            add_mdns("ssl", host, iport)
     628    netlog("setting up UDP sockets: %s", bind_udp)
     629    for host, iport in bind_udp:
     630        add_udp_socket("udp", host, iport)
     631    netlog("setting up UDP+DTLS sockets: %s", bind_dtls)
     632    for host, iport in bind_dtls:
     633        add_udp_socket("dtls", host, iport)
    618634    netlog("setting up http / ws (websockets): %s", bind_ws)
    619635    for host, iport in bind_ws:
    620636        add_tcp_socket("ws", host, iport)
     
    908924            mdns_publish(display_name, mode, listen_on, mdns_info)
    909925
    910926    try:
    911         app._ssl_wrap_socket = wrap_socket_fn
     927        app._ssl_wrap_server_socket = wrap_server_socket_fn
     928        app._ssl_wrap_client_socket = wrap_client_socket_fn
    912929        app.original_desktop_display = desktop_display
    913930        app.exec_cwd = opts.chdir or cwd
    914931        app.init(opts)
  • xpra/server/server_core.py

     
    146146        self.ws_auth_class = None
    147147        self.wss_auth_class = None
    148148        self.ssl_auth_class = None
     149        self.udp_auth_class = None
     150        self.dtls_auth_class = None
    149151        self.rfb_auth_class = None
    150152        self.vsock_auth_class = None
    151153        self._when_ready = []
     
    157159        #networking bits:
    158160        self._socket_info = []
    159161        self._potential_protocols = []
     162        self._udp_listeners = []
     163        self._udp_protocols = {}
    160164        self._tcp_proxy_clients = []
    161165        self._tcp_proxy = ""
    162         self._ssl_wrap_socket = None
     166        self._ssl_wrap_server_socket = None
     167        self._ssl_wrap_client_socket = None
    163168        self._accept_timeout = SOCKET_TIMEOUT + 1
    164169        self.ssl_mode = None
    165170        self._html = False
     
    383388        self._default_packet_handlers = {
    384389            "hello":                                self._process_hello,
    385390            "disconnect":                           self._process_disconnect,
     391            "udp-control":                          self._process_udp_control,
    386392            Protocol.CONNECTION_LOST:               self._process_connection_lost,
    387393            Protocol.GIBBERISH:                     self._process_gibberish,
    388394            Protocol.INVALID:                       self._process_invalid,
     
    538544        self.do_cleanup()
    539545        self.cleanup_protocols(protocols, reason, True)
    540546        self._potential_protocols = []
     547        self.cleanup_udp_listeners()
    541548
    542549    def do_cleanup(self):
    543550        #allow just a bit of time for the protocol packet flush
     
    544551        sleep(0.1)
    545552
    546553
     554    def cleanup_udp_listeners(self):
     555        for udpl in self._udp_listeners:
     556            udpl.close()
     557        self._udp_listeners = []
     558
    547559    def cleanup_all_protocols(self, reason):
    548560        protocols = self.get_all_protocols()
    549561        self.cleanup_protocols(protocols, reason)
     
    569581            #named pipe listener uses a thread:
    570582            sock.new_connection_cb = self._new_connection
    571583            sock.start()
     584        elif socktype in ("udp", "dtls"):
     585            #socket_info = self.socket_info.get(sock)
     586            from xpra.net.udp_protocol import UDPListener
     587            udpl = UDPListener(sock, self.process_udp_packet)
     588            self._udp_listeners.append(udpl)
    572589        else:
    573590            from xpra.gtk_common.gobject_compat import import_glib
    574591            glib = import_glib()
     
    673690        peek_data, line1 = self.peek_connection(conn)
    674691
    675692        def ssl_wrap():
    676             ssl_sock = self._ssl_wrap_socket(sock)
     693            ssl_sock = self._ssl_wrap_server_socket(sock)
    677694            netlog("ssl wrapped socket(%s)=%s", sock, ssl_sock)
    678695            if ssl_sock is None:
    679696                #None means EOF! (we don't want to import ssl bits here)
     
    729746            self.handle_rfb_connection(conn)
    730747            return
    731748
    732         elif socktype=="tcp" and peek_data and (self._html or self._tcp_proxy or self._ssl_wrap_socket):
     749        elif peek_data and (
     750            (socktype=="tcp" and  (self._html or self._tcp_proxy or self._ssl_wrap_server_socket)) or
     751            (socktype=="udp" and self._ssl_wrap_server_socket and self._ssl_wrap_client_socket)
     752            ):
    733753            #see if the packet data is actually xpra or something else
    734754            #that we need to handle via a tcp proxy, ssl wrapper or the websockify adapter:
    735755            try:
     
    763783        netlog("make_protocol(%s, %s)", socktype, conn)
    764784        socktype = socktype.lower()
    765785        protocol = protocol_class(conn)
     786        protocol.socket_type = socktype
    766787        self._potential_protocols.append(protocol)
    767788        protocol.challenge_sent = False
    768789        protocol.authenticator = None
    769790        protocol.encryption = None
    770791        protocol.keyfile = None
     792        protocol.auth_class = {
     793            "tcp"           : self.tcp_auth_class,
     794            "ssl"           : self.ssl_auth_class,
     795            "udp"           : self.udp_auth_class,
     796            "dtls"          : self.dtls_auth_class,
     797            "ws"            : self.ws_auth_class,
     798            "wss"           : self.wss_auth_class,
     799            "rfb"           : self.rfb_auth_class,
     800            "vsock"         : self.vsock_auth_class,
     801            "unix-domain"   : self.auth_class,
     802            "named-pipe"    : self.auth_class,
     803            }[socktype]
    771804        if socktype=="tcp":
    772             protocol.auth_class = self.tcp_auth_class
     805            #special case for legacy encryption code:
    773806            protocol.encryption = self.tcp_encryption
    774807            protocol.keyfile = self.tcp_encryption_keyfile
    775808            if protocol.encryption and ENCRYPT_FIRST_PACKET:
     
    776809                authlog("encryption=%s, keyfile=%s", protocol.encryption, protocol.keyfile)
    777810                password = self.get_encryption_key(None, protocol.keyfile)
    778811                protocol.set_cipher_in(protocol.encryption, DEFAULT_IV, password, DEFAULT_SALT, DEFAULT_ITERATIONS, INITIAL_PADDING)
    779         elif socktype=="ssl":
    780             protocol.auth_class = self.ssl_auth_class
    781         elif socktype=="ws":
    782             protocol.auth_class = self.ws_auth_class
    783         elif socktype=="wss":
    784             protocol.auth_class = self.wss_auth_class
    785         elif socktype=="rfb":
    786             protocol.auth_class = self.rfb_auth_class
    787         elif socktype=="vsock":
    788             protocol.auth_class = self.vsock_auth_class
    789         else:
    790             protocol.auth_class = self.auth_class
    791         protocol.socket_type = socktype
    792812        protocol.invalid_header = self.invalid_header
    793813        authlog("socktype=%s, auth class=%s, encryption=%s, keyfile=%s", socktype, protocol.auth_class, protocol.encryption, protocol.keyfile)
    794814        protocol.start()
     
    811831            #xpra packet header, no need to wrap this connection
    812832            return True, conn, peek_data
    813833        frominfo = pretty_socket(conn.remote)
    814         if self._ssl_wrap_socket and peek_data[0] in (chr(0x16), 0x16):
    815             socktype = "SSL"
     834        if self._ssl_wrap_server_socket and peek_data[0] in (chr(0x16), 0x16):
     835            socktype = {
     836                "udp"   : "dtls",
     837                "tcp"   : "SSL",
     838                }[socktype]
    816839            sock, sockname, address, target = conn._socket, conn.local, conn.remote, conn.target
    817             sock = self._ssl_wrap_socket(sock)
     840            sock = self._ssl_wrap_server_socket(sock)
    818841            if sock is None:
    819842                #None means EOF! (we don't want to import ssl bits here)
    820843                netlog("ignoring SSL EOF error")
     
    850873        return True, conn, peek_data
    851874
    852875    def invalid_header(self, proto, data, msg=""):
    853         netlog("invalid_header(%s, %s bytes: '%s', %s) input_packetcount=%s, tcp_proxy=%s, html=%s, ssl=%s", proto, len(data or ""), msg, repr_ellipsized(data), proto.input_packetcount, self._tcp_proxy, self._html, bool(self._ssl_wrap_socket))
     876        netlog("invalid_header(%s, %s bytes: '%s', %s) input_packetcount=%s, tcp_proxy=%s, html=%s, ssl=%s",
     877               proto, len(data or ""), msg, repr_ellipsized(data), proto.input_packetcount, self._tcp_proxy, self._html, bool(self._ssl_wrap_server_socket))
    854878        err = "invalid packet format, %s" % self.guess_header_protocol(data)
    855879        proto.gibberish(err, data)
    856880
     
    14441468        for socktype, auth_class in {
    14451469                                     "tcp"          : self.tcp_auth_class,
    14461470                                     "ssl"          : self.ssl_auth_class,
     1471                                     "ws"           : self.ws_auth_class,
     1472                                     "wss"          : self.wss_auth_class,
     1473                                     "udp"          : self.udp_auth_class,
     1474                                     "dtls"         : self.dtls_auth_class,
     1475                                     "rfb"          : self.rfb_auth_class,
    14471476                                     "unix-domain"  : self.auth_class,
    14481477                                     "vsock"        : self.vsock_auth_class,
    14491478                                     }.items():
     
    14711500        except:
    14721501            netlog.error("Unhandled error while processing a '%s' packet from peer using %s", packet_type, handler, exc_info=True)
    14731502
     1503
    14741504    def handle_rfb_connection(self, conn):
    14751505        log.error("Error: RFB protocol is not supported by this server")
    14761506        conn.close()
     1507
     1508
     1509    def _process_udp_control(self, proto, packet):
     1510        proto.process_control(*packet[1:])
     1511
     1512    def process_udp_packet(self, udp_listener, uuid, seqno, synchronous, chunk, chunks, data, bfrom):
     1513        #log.info("process_udp_packet%s", (udp_listener, uuid, seqno, synchronous, chunk, chunks, len(data), bfrom))
     1514        protocol = self._udp_protocols.get(uuid)
     1515        if not protocol:
     1516            from xpra.net.udp_protocol import UDPServerProtocol, UDPSocketConnection
     1517            def udp_protocol_class(conn):
     1518                protocol = UDPServerProtocol(self, conn, self.process_packet)
     1519                protocol.large_packets.append("info-response")
     1520                protocol.receive_aliases.update(self._aliases)
     1521                return protocol
     1522            socktype = "udp"        #breaks dtls...
     1523            host, port = bfrom
     1524            sock = udp_listener._socket
     1525            if socktype=="dtls":
     1526                from dtls import do_patch   #@UnresolvedImport
     1527                do_patch()
     1528                if False:
     1529                    family = socket.AF_INET6
     1530                else:
     1531                    family = socket.AF_INET
     1532                #sock = socket.socket(family, socket.SOCK_DGRAM)
     1533                #sock.bind(("0.0.0.0", 10003))
     1534                sock.connect(bfrom)
     1535                sock  = self._ssl_wrap_client_socket(sock, handshake=False)
     1536                assert sock, "failed to wrap socket %s" % sock
     1537                sockname = sock.getsockname()
     1538                conn = SocketConnection(sock, sockname, (host, port), (host, port), socktype)
     1539            else:
     1540                sockname = sock.getsockname()
     1541                conn = UDPSocketConnection(sock, sockname, (host, port), (host, port), socktype)
     1542            conn.timeout = SOCKET_TIMEOUT
     1543            protocol = self.do_make_protocol(socktype, conn, udp_protocol_class)
     1544            self._udp_protocols[uuid] = protocol
     1545        #assert packetsize==datalen, "expected datalen=packetsize, but got %i!=%i" % (datalen, packetsize)
     1546        #assert len(data)==datalen, "expected %i bytes but got %i" % (datalen, len(data))
     1547        protocol.process_udp_data(seqno, synchronous, chunk, chunks, data, bfrom)
  • xpra/server/socket_util.py

     
    108108        log("create_tcp_socket%s", (host, iport), exc_info=True)
    109109        raise InitException("failed to setup %s socket on %s:%s %s" % (socktype, host, iport, e))
    110110    def cleanup_tcp_socket():
    111         log.info("closing %s socket %s:%s", socktype, host, iport)
     111        log.info("closing %s socket %s:%s", socktype.lower(), host, iport)
    112112        try:
    113113            tcp_socket.close()
    114114        except:
     
    117117    log("%s: %s:%s : %s", socktype, host, iport, socket)
    118118    return socktype, tcp_socket, (host, iport)
    119119
     120def create_udp_socket(host, iport):
     121    if host.find(":")<0:
     122        listener = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
     123        sockaddr = (host, iport)
     124    else:
     125        assert socket.has_ipv6, "specified an IPv6 address but this is not supported"
     126        res = socket.getaddrinfo(host, iport, socket.AF_INET6, socket.SOCK_DGRAM)
     127        listener = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
     128        sockaddr = res[0][-1]
     129    listener.bind(sockaddr)
     130    return listener
    120131
    121 def parse_bind_tcp(bind_tcp):
    122     tcp_sockets = set()
    123     if bind_tcp:
    124         for spec in bind_tcp:
     132def setup_udp_socket(host, iport, socktype="udp"):
     133    from xpra.log import Logger
     134    log = Logger("network")
     135    try:
     136        udp_socket = create_udp_socket(host, iport)
     137    except Exception as e:
     138        log("create_udp_socket%s", (host, iport), exc_info=True)
     139        raise InitException("failed to setup %s socket on %s:%s %s" % (socktype, host, iport, e))
     140    def cleanup_udp_socket():
     141        log.info("closing %s socket %s:%s", socktype, host, iport)
     142        try:
     143            udp_socket.close()
     144        except:
     145            pass
     146    add_cleanup(cleanup_udp_socket)
     147    log("%s: %s:%s : %s", socktype, host, iport, socket)
     148    return socktype, udp_socket, (host, iport)
     149
     150
     151def parse_bind_ip(bind_ip):
     152    ip_sockets = set()
     153    if bind_ip:
     154        for spec in bind_ip:
    125155            if ":" not in spec:
    126                 raise InitException("TCP port must be specified as [HOST]:PORT")
     156                raise InitException("port must be specified as [HOST]:PORT")
    127157            host, port = spec.rsplit(":", 1)
    128158            if host == "":
    129159                host = "127.0.0.1"
     
    135165                    assert iport>0 and iport<2**16
    136166                except:
    137167                    raise InitException("invalid port number: %s" % port)
    138             tcp_sockets.add((host, iport))
    139     return tcp_sockets
     168            ip_sockets.add((host, iport))
     169    return ip_sockets
    140170
    141171def setup_vsock_socket(cid, iport):
    142172    from xpra.log import Logger
  • xpra/server/source.py

     
    14021402#
    14031403    def next_packet(self):
    14041404        """ Called by protocol.py when it is ready to send the next packet """
    1405         packet, start_send_cb, end_send_cb, have_more = None, None, None, False
     1405        packet, start_send_cb, end_send_cb, fail_cb, have_more = None, None, None, None, False
    14061406        if not self.is_closed():
    14071407            if len(self.ordinary_packets)>0:
    14081408                packet = self.ordinary_packets.pop(0)
    14091409            elif len(self.packet_queue)>0:
    1410                 packet, _, _, start_send_cb, end_send_cb = self.packet_queue.popleft()
     1410                packet, _, _, start_send_cb, end_send_cb, fail_cb = self.packet_queue.popleft()
    14111411            have_more = packet is not None and (len(self.ordinary_packets)>0 or len(self.packet_queue)>0)
    1412         return packet, start_send_cb, end_send_cb, have_more
     1412        return packet, start_send_cb, end_send_cb, fail_cb, have_more
    14131413
    14141414    def send(self, *parts):
    14151415        """ This method queues non-damage packets (higher priority) """
     
    22942294        self.statistics.compression_work_qsizes.append((monotonic_time(), self.encode_work_queue.qsize()))
    22952295        self.encode_work_queue.put(fn_and_args)
    22962296
    2297     def queue_packet(self, packet, wid=0, pixels=0, start_send_cb=None, end_send_cb=None):
     2297    def queue_packet(self, packet, wid=0, pixels=0, start_send_cb=None, end_send_cb=None, fail_cb=None):
    22982298        """
    22992299            Add a new 'draw' packet to the 'packet_queue'.
    23002300            Note: this code runs in the non-ui thread
     
    23032303        self.statistics.packet_qsizes.append((now, len(self.packet_queue)))
    23042304        if wid>0:
    23052305            self.statistics.damage_packet_qpixels.append((now, wid, sum(x[2] for x in list(self.packet_queue) if x[1]==wid)))
    2306         self.packet_queue.append((packet, wid, pixels, start_send_cb, end_send_cb))
     2306        self.packet_queue.append((packet, wid, pixels, start_send_cb, end_send_cb, fail_cb))
    23072307        p = self.protocol
    23082308        if p:
    23092309            p.source_has_more()
  • xpra/server/window/window_source.py

     
    17261726            now = monotonic_time()
    17271727            damage_in_latency = now-process_damage_time
    17281728            self.statistics.damage_in_latency.append((now, width*height, actual_batch_delay, damage_in_latency))
    1729         self.queue_packet(packet, self.wid, width*height, start_send, damage_packet_sent)
     1729        fail_cb = self.get_fail_cb(packet)
     1730        #log.info("queuing %s packet with fail_cb=%s", coding, fail_cb)
     1731        self.queue_packet(packet, self.wid, width*height, start_send, damage_packet_sent, fail_cb)
    17301732
     1733    def get_fail_cb(self, packet):
     1734        def resend():
     1735            log.warn("paint packet failure, resending")
     1736            x,y,width,height = packet[2:6]
     1737            damage_packet_sequence = packet[8]
     1738            self.damage_packet_acked(damage_packet_sequence, width, height, 0, "")
     1739            self.idle_add(self.damage, x, y, width, height)
     1740        return resend
     1741
    17311742    def damage_packet_acked(self, damage_packet_sequence, width, height, decode_time, message):
    17321743        """
    17331744            The client is acknowledging a damage packet,
  • xpra/server/window/window_video_source.py

     
    14261426        return {}
    14271427
    14281428
     1429    def get_fail_cb(self, packet):
     1430        coding = packet[6]
     1431        if coding in self.common_video_encodings:
     1432            return None
     1433        return WindowSource.get_fail_cb(self, packet)
     1434
     1435
    14291436    def make_draw_packet(self, x, y, w, h, coding, data, outstride, client_options={}, options={}):
    14301437        #overriden so we can invalidate the scroll data:
    14311438        #log.error("make_draw_packet%s", (x, y, w, h, coding, "..", outstride, client_options)