xpra icon
Bug tracker and wiki

Ticket #639: udp-v2.patch

File udp-v2.patch, 35.0 KB (added by Antoine Martin, 2 years ago)

working udp connections (but without handling client authentication, packet loss, etc..)

  • xpra/client/client_base.py

     
    232232        raise NotImplementedError()
    233233
    234234    def setup_connection(self, conn):
    235         netlog("setup_connection(%s) timeout=%s", conn, conn.timeout)
    236         self._protocol = Protocol(self.get_scheduler(), conn, self.process_packet, self.next_packet)
     235        netlog.info("setup_connection(%s) timeout=%s, socktype=%s", conn, conn.timeout, conn.socktype)
     236        if conn.socktype in ("udp", "dtls"):
     237            from xpra.net.udp_protocol import UDPClientProtocol
     238            self._protocol = UDPClientProtocol(self.get_scheduler(), conn, self.process_packet, self.next_packet)
     239        else:
     240            self._protocol = Protocol(self.get_scheduler(), conn, self.process_packet, self.next_packet)
    237241        self._protocol.large_packets.append("keymap-changed")
    238242        self._protocol.large_packets.append("server-settings")
    239243        self._protocol.large_packets.append("logging")
  • xpra/net/bytestreams.py

     
    277277            i = s
    278278        log("%s.close() for socket=%s", self, i)
    279279        Connection.close(self)
    280         s.settimeout(0)
     280        try:
     281            s.settimeout(0)
     282        except:
     283            pass
    281284        #this is more proper but would break the proxy server:
    282285        #s.shutdown(socket.SHUT_RDWR)
    283286        s.close()
     
    306309        s = self._socket
    307310        if not s:
    308311            return None
    309         return {
    310                 #"class"         : str(type(s)),
    311                 "fileno"        : s.fileno(),
    312                 "timeout"       : int(1000*(s.gettimeout() or 0)),
     312        info = {
    313313                "family"        : FAMILY_STR.get(s.family, s.family),
    314314                "proto"         : s.proto,
    315315                "type"          : PROTOCOL_STR.get(s.type, s.type),
    316316                }
     317        try:
     318            info["timeout"] = int(1000*(s.gettimeout() or 0))
     319        except:
     320            pass
     321        if hasattr(s, "fileno"):
     322            info["fileno"] = s.fileno()
     323        return info
    317324
    318 
    319325try:
    320326    #this wrapper class allows us to override the normal ssl.Socket
    321327    #class so that we can fake peek() support by actually reading from the socket
  • xpra/net/protocol.py

     
    344344        self._write_queue.put(items)
    345345        self.output_packetcount += 1
    346346
     347
    347348    def start_write_thread(self):
    348349        self._write_thread = start_thread(self._write_thread_loop, "write", daemon=True)
    349350
     
    579580                    if not self._closed:
    580581                        log.error("Error on write start callback %s", start_cb, exc_info=True)
    581582            while buf and not self._closed:
    582                 written = con.write(buf)
     583                written = self.con_write(con, buf)
    583584                #example test code, for sending small chunks very slowly:
    584585                #written = con.write(buf[:1024])
    585586                #import time
     
    595596                        log.error("Error on write end callback %s", end_cb, exc_info=True)
    596597        return True
    597598
     599    def con_write(self, con, data):
     600        return con.write(data)
     601
     602
    598603    def _read_thread_loop(self):
    599604        self._io_thread_loop("read", self._read)
    600605    def _read(self):
     
    676681                return
    677682            self._internal_error("error in network packet reading/parsing", e, exc_info=True)
    678683
     684    def read_preprocess(self, buf):
     685        return buf
     686
    679687    def do_read_parse_thread_loop(self):
    680688        """
    681689            Process the individual network packets placed in _read_queue.
     
    701709                log("parse thread: empty marker, exiting")
    702710                self.idle_add(self.close)
    703711                return
     712            buf = self.read_preprocess(buf)
    704713            if read_buffer:
    705714                read_buffer = read_buffer + buf
    706715            else:
  • xpra/net/udp_protocol.py

     
     1# This file is part of Xpra.
     2# Copyright (C) 2017 Antoine Martin <antoine@devloop.org.uk>
     3# Xpra is released under the terms of the GNU GPL v2, or, at your option, any
     4# later version. See the file COPYING for details.
     5
     6import struct
     7
     8from xpra.log import Logger
     9log = Logger("network", "protocol")
     10
     11from xpra.os_util import Queue, memoryview_to_bytes
     12from xpra.util import envint
     13from xpra.make_thread import make_thread
     14from xpra.net.protocol import Protocol, JOIN_TYPES
     15from xpra.net.bytestreams import SocketConnection
     16
     17READ_BUFFER_SIZE = envint("XPRA_READ_BUFFER_SIZE", 65536)
     18
     19
     20#UUID, seqno, packetsize, dataoffset, datalen
     21_header_struct = struct.Struct('!QQLLL')
     22_header_size = _header_struct.size
     23
     24
     25class UDPListener(object):
     26
     27    def __init__(self, scheduler, sock, socktype, process_packet_cb):
     28        assert scheduler is not None
     29        assert sock is not None
     30        self.timeout_add = scheduler.timeout_add
     31        self.idle_add = scheduler.idle_add
     32        self._closed = False
     33        self._socket = sock
     34        self._socktype = socktype
     35        self._process_packet_cb =  process_packet_cb
     36        self._read_queue = Queue(20)
     37        self._read_thread = make_thread(self._read_thread_loop, "read", daemon=True)
     38        self._read_parser_thread = None         #started when needed
     39        self._read_queue_put = self.read_queue_put
     40
     41    def __repr__(self):
     42        return "UDPListener(%s)" % self._socket
     43
     44    def start(self):
     45        def start_network_read_thread():
     46            if not self._closed:
     47                self._read_thread.start()
     48        self.idle_add(start_network_read_thread)
     49
     50
     51    def _io_thread_loop(self, name, callback):
     52        try:
     53            log("io_thread_loop(%s, %s) loop starting", name, callback)
     54            while not self._closed and callback():
     55                pass
     56            log("io_thread_loop(%s, %s) loop ended, closed=%s", name, callback, self._closed)
     57        except Exception as e:
     58            #can happen during close(), in which case we just ignore:
     59            if not self._closed:
     60                log.error("Error: %s on %s failed: %s", name, self._socket, type(e), exc_info=True)
     61                self.close()
     62
     63    def _read_thread_loop(self):
     64        self._io_thread_loop("read", self._read)
     65    def _read(self):
     66        buf, bfrom = self._socket.recvfrom(READ_BUFFER_SIZE)
     67        #log("read thread: got data of size %s: %s", len(buf), repr_ellipsized(buf))
     68        #add to the read queue (or whatever takes its place - see steal_connection)
     69        self._read_queue_put((buf, bfrom))
     70        if not buf:
     71            log("read thread: eof")
     72            #give time to the parse thread to call close itself
     73            #so it has time to parse and process the last packet received
     74            self.timeout_add(1000, self.close)
     75            return False
     76        return True
     77
     78    def read_queue_put(self, data):
     79        #start the parse thread if needed:
     80        if not self._read_parser_thread and not self._closed:
     81            if data is None:
     82                log("empty marker in read queue, exiting")
     83                self.idle_add(self.close)
     84                return
     85            self._read_parser_thread = make_thread(self._read_parse_thread_loop, "parse", daemon=True)
     86            self._read_parser_thread.start()
     87        self._read_queue.put(data)
     88        #from now on, take shortcut:
     89        if self._read_queue_put==self.read_queue_put:
     90            self._read_queue_put = self._read_queue.put
     91
     92    def _read_parse_thread_loop(self):
     93        log("read_parse_thread_loop starting")
     94        try:
     95            self.do_read_parse_thread_loop()
     96        except Exception:
     97            if self._closed:
     98                return
     99            log.error("Error: network packet reading/parsing", exc_info=True)
     100
     101    def do_read_parse_thread_loop(self):
     102        while not self._closed:
     103            buf, bfrom = self._read_queue.get()
     104            if not buf:
     105                log("parse thread: empty marker, exiting")
     106                self.idle_add(self.close)
     107                return
     108            values = list(_header_struct.unpack_from(buf[:_header_size]))
     109            values.append(buf[_header_size:])
     110            values.append(bfrom)
     111            self._process_packet_cb(self, *values)
     112           
     113
     114    def close(self):
     115        log("UDPListener.close() closed=%s, socket=%s", self._closed, self._socket)
     116        if self._closed:
     117            return
     118        self._closed = True
     119        s = self._socket
     120        if s:
     121            try:
     122                log("Protocol.close() calling %s", s.close)
     123                s.close()
     124            except:
     125                log.error("error closing %s", self._socket, exc_info=True)
     126            self._socket = None
     127        self.terminate_queue_threads()
     128        self.idle_add(self.clean)
     129        log("UDPListener.close() done")
     130
     131    def clean(self):
     132        #clear all references to ensure we can get garbage collected quickly:
     133        self._read_thread = None
     134        self._read_parser_thread = None
     135
     136    def terminate_queue_threads(self):
     137        log("terminate_queue_threads()")
     138        exit_queue = Queue()
     139        for _ in range(10):     #just 2 should be enough!
     140            exit_queue.put(None)
     141        try:
     142            orq = self._read_queue
     143            self._read_queue = exit_queue
     144            #discard all elements in the old queue and push the None marker:
     145            try:
     146                while orq.qsize()>0:
     147                    orq.read(False)
     148            except:
     149                pass
     150            orq.put_nowait(None)
     151        except:
     152            pass
     153
     154class UDPProtocol(Protocol):
     155
     156    def con_write(self, con, data):
     157        log.info("UDP.con_write(%s, %s)", con, len(data))
     158        if type(data) not in JOIN_TYPES:
     159            data = memoryview_to_bytes(data)
     160        uuid = 0 #todo!
     161        seqno = self.output_raw_packetcount
     162        packetsize = len(data)
     163        header_and_data = _header_struct.pack(uuid, seqno, packetsize, 0, packetsize) + data
     164        return con.write(header_and_data)
     165
     166class UDPServerProtocol(UDPProtocol):
     167
     168    def _read_thread_loop(self):
     169        #server protocol is not used to read,
     170        #we rely on the listener to dispatch packets instead
     171        pass
     172
     173class UDPClientProtocol(UDPProtocol):
     174    def read_preprocess(self, buf):
     175        return buf[_header_size:]
     176
     177
     178class UDPSocketConnection(SocketConnection):
     179
     180    def write(self, buf):
     181        return self._write(self._socket.sendto, buf, self.remote)
     182
     183    def close(self):
     184        #don't close the socket, we're re-using it
     185        pass
  • xpra/scripts/config.py

     
    437437                    "auth"              : str,
    438438                    "vsock-auth"        : str,
    439439                    "tcp-auth"          : str,
     440                    "udp-auth"          : str,
     441                    "dtls-auth"         : str,
    440442                    "ws-auth"           : str,
    441443                    "wss-auth"          : str,
    442444                    "ssl-auth"          : str,
     
    594596                    "bind"              : list,
    595597                    "bind-vsock"        : list,
    596598                    "bind-tcp"          : list,
     599                    "bind-udp"          : list,
     600                    "bind-dtls"         : list,
    597601                    "bind-ws"           : list,
    598602                    "bind-wss"          : list,
    599603                    "bind-ssl"          : list,
     
    615619    "start-after-connect", "start-child-after-connect",
    616620    "start-on-connect", "start-child-on-connect",
    617621    ]
    618 BIND_OPTIONS = ["bind", "bind-tcp", "bind-ssl", "bind-ws", "bind-wss", "bind-vsock"]
     622BIND_OPTIONS = ["bind", "bind-tcp", "bind-udp", "bind-ssl", "bind-ws", "bind-wss", "bind-vsock"]
    619623
    620624#keep track of the options added since v1,
    621625#so we can generate command lines that work with older supported versions:
     
    679683    "av-sync", "global-menus",
    680684    "printing", "file-transfer", "open-command", "open-files", "start-new-commands",
    681685    "mmap", "mmap-group", "mdns",
    682     "auth", "vsock-auth", "tcp-auth", "ws-auth", "wss-auth", "ssl-auth", "rfb-auth",
    683     "bind", "bind-vsock", "bind-tcp", "bind-ssl", "bind-ws", "bind-wss", "bind-rfb",
     686    "auth", "vsock-auth", "tcp-auth", "udp-auth", "dtls-auth", "ws-auth", "wss-auth", "ssl-auth", "rfb-auth",
     687    "bind", "bind-vsock", "bind-tcp", "bind-udp", "bind-dtls", "bind-ssl", "bind-ws", "bind-wss", "bind-rfb",
    684688    "start", "start-child",
    685689    "start-after-connect", "start-child-after-connect",
    686690    "start-on-connect", "start-child-on-connect",
     
    799803                    "auth"              : "",
    800804                    "vsock-auth"        : "",
    801805                    "tcp-auth"          : "",
     806                    "udp-auth"          : "",
     807                    "dtls-auth"         : "",
    802808                    "ws-auth"           : "",
    803809                    "wss-auth"          : "",
    804810                    "ssl-auth"          : "",
     
    946952                    "bind"              : bind_dirs,
    947953                    "bind-vsock"        : [],
    948954                    "bind-tcp"          : [],
     955                    "bind-udp"          : [],
     956                    "bind-dtls"         : [],
    949957                    "bind-ws"           : [],
    950958                    "bind-wss"          : [],
    951959                    "bind-ssl"          : [],
  • xpra/scripts/main.py

     
    534534                          metavar="[HOST]:[PORT]",
    535535                          help="Listen for connections over TCP (use --tcp-auth to secure it)."
    536536                            + " You may specify this option multiple times with different host and port combinations")
     537        group.add_option("--bind-udp", action="append",
     538                          dest="bind_udp", default=list(defaults.bind_udp or []),
     539                          metavar="[HOST]:[PORT]",
     540                          help="Listen for connections over UDP (use --udp-auth to secure it)."
     541                            + " You may specify this option multiple times with different host and port combinations")
     542        group.add_option("--bind-dtls", action="append",
     543                          dest="bind_dtls", default=list(defaults.bind_dtls or []),
     544                          metavar="[HOST]:[PORT]",
     545                          help="Listen for connections over UDP + DTLS (use --dtls-auth to secure it)."
     546                            + " You may specify this option multiple times with different host and port combinations")
    537547        group.add_option("--bind-ws", action="append",
    538548                          dest="bind_ws", default=list(defaults.bind_ws or []),
    539549                          metavar="[HOST]:[PORT]",
     
    558568        ignore({
    559569            "bind"      : defaults.bind,
    560570            "bind-tcp"  : defaults.bind_tcp,
     571            "bind-udp"  : defaults.bind_udp,
    561572            "bind-ws"   : defaults.bind_ws,
    562573            "bind-wss"  : defaults.bind_wss,
    563574            "bind-ssl"  : defaults.bind_ssl,
     
    974985    group.add_option("--tcp-auth", action="store",
    975986                      dest="tcp_auth", default=defaults.tcp_auth,
    976987                      help="The authentication module to use for TCP sockets (default: '%default')")
     988    group.add_option("--udp-auth", action="store",
     989                      dest="udp_auth", default=defaults.udp_auth,
     990                      help="The authentication module to use for UDP sockets (default: '%default')")
    977991    group.add_option("--ws-auth", action="store",
    978992                      dest="ws_auth", default=defaults.ws_auth,
    979993                      help="The authentication module to use for Websockets (default: '%default')")
     
    16861700        if opts.socket_dir:
    16871701            desc["socket_dir"] = opts.socket_dir
    16881702        return desc
    1689     elif display_name.startswith("tcp:") or display_name.startswith("tcp/") or \
    1690             display_name.startswith("ssl:") or display_name.startswith("ssl/"):
    1691         ctype = display_name[:3]        #ie: "ssl" or "tcp"
    1692         separator = display_name[3]     # ":" or "/"
     1703    elif (
     1704        display_name.startswith("tcp:") or display_name.startswith("tcp/") or \
     1705        display_name.startswith("ssl:") or display_name.startswith("ssl/") or \
     1706        display_name.startswith("udp:") or display_name.startswith("udp/") or \
     1707        display_name.startswith("dtls:") or display_name.startswith("dtls/")
     1708        ):
     1709        ctype = display_name[:4].rstrip(":/")   #ie: "ssl" or "tcp"
     1710        separator = display_name[len(ctype)]     # ":" or "/"
    16931711        desc.update({
    16941712                     "type"     : ctype,
    16951713                     })
     
    20732091        from xpra.net.bytestreams import SocketConnection
    20742092        return SocketConnection(sock, "local", "host", (CID_TYPES.get(cid, cid), iport), dtype)
    20752093
    2076     elif dtype in ("tcp", "ssl", "ws", "wss"):
     2094    elif dtype in ("tcp", "ssl", "ws", "wss", "udp", "dtls"):
    20772095        if display_desc.get("ipv6"):
    20782096            assert socket.has_ipv6, "no IPv6 support"
    20792097            family = socket.AF_INET6
     
    20892107                socket.AF_INET  : "IPv4",
    20902108                }.get(family, family), (host, port), e))
    20912109        sockaddr = addrinfo[0][-1]
    2092         sock = socket.socket(family, socket.SOCK_STREAM)
    2093         sock.settimeout(SOCKET_TIMEOUT)
    2094         sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, TCP_NODELAY)
     2110        if dtype in ("udp", "dtls"):
     2111            sock = socket.socket(family, socket.SOCK_DGRAM)
     2112        else:
     2113            sock = socket.socket(family, socket.SOCK_STREAM)
     2114            sock.settimeout(SOCKET_TIMEOUT)
     2115            sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, TCP_NODELAY)
    20952116        strict_host_check = display_desc.get("strict-host-check")
    20962117        if strict_host_check is False:
    20972118            opts.ssl_server_verify_mode = "none"
    20982119        conn = _socket_connect(sock, sockaddr, display_name, dtype)
    2099         if dtype in ("ssl", "wss"):
     2120        if dtype in ("ssl", "wss", "dtls"):
     2121            if dtype=="dtls":
     2122                from dtls import do_patch   #@UnresolvedImport
     2123                do_patch()
    21002124            wrap_socket = ssl_wrap_socket_fn(opts, server_side=False)
    21012125            sock = wrap_socket(sock)
    21022126            assert sock, "failed to wrap socket %s" % sock
  • xpra/scripts/server.py

     
    357357    if opts.encoding=="help" or "help" in opts.encodings:
    358358        return show_encoding_help(opts)
    359359
    360     from xpra.server.socket_util import parse_bind_tcp, parse_bind_vsock
    361     bind_tcp = parse_bind_tcp(opts.bind_tcp)
    362     bind_ssl = parse_bind_tcp(opts.bind_ssl)
    363     bind_ws = parse_bind_tcp(opts.bind_ws)
    364     bind_wss = parse_bind_tcp(opts.bind_wss)
    365     bind_rfb = parse_bind_tcp(opts.bind_rfb)
     360    from xpra.server.socket_util import parse_bind_ip, parse_bind_vsock
     361    bind_tcp = parse_bind_ip(opts.bind_tcp)
     362    bind_udp = parse_bind_ip(opts.bind_udp)
     363    bind_dtls= parse_bind_ip(opts.bind_dtls)
     364    bind_ssl = parse_bind_ip(opts.bind_ssl)
     365    bind_ws  = parse_bind_ip(opts.bind_ws)
     366    bind_wss = parse_bind_ip(opts.bind_wss)
     367    bind_rfb = parse_bind_ip(opts.bind_rfb)
    366368    bind_vsock = parse_bind_vsock(opts.bind_vsock)
    367369
    368370    assert mode in ("start", "start-desktop", "upgrade", "shadow", "proxy")
     
    567569    sockets = []
    568570
    569571    #SSL sockets:
    570     wrap_socket_fn = None
     572    wrap_server_socket_fn = None
     573    wrap_client_socket_fn = None
    571574    need_ssl = False
    572575    ssl_opt = opts.ssl.lower()
    573576    if ssl_opt in TRUE_OPTIONS or bind_ssl or bind_wss:
     
    582585    if need_ssl:
    583586        from xpra.scripts.main import ssl_wrap_socket_fn
    584587        try:
    585             wrap_socket_fn = ssl_wrap_socket_fn(opts, server_side=True)
    586             netlog("wrap_socket_fn=%s", wrap_socket_fn)
     588            wrap_server_socket_fn = ssl_wrap_socket_fn(opts, server_side=True)
     589            wrap_client_socket_fn = ssl_wrap_socket_fn(opts, server_side=False)
     590            netlog("wrap socket functions: %s, %s", wrap_server_socket_fn, wrap_client_socket_fn)
    587591        except Exception as e:
    588592            netlog("SSL error", exc_info=True)
    589593            cpaths = csv("'%s'" % x for x in (opts.ssl_cert, opts.ssl_key) if x)
    590594            raise InitException("cannot create SSL socket, check your certificate paths (%s): %s" % (cpaths, e))
    591595
     596    from xpra.server.socket_util import setup_tcp_socket, setup_udp_socket, setup_vsock_socket, setup_local_sockets
    592597    def add_mdns(socktype, host, port):
    593598        recs = mdns_recs.setdefault(socktype.lower(), [])
    594599        rec = (host, port)
     
    598603        socket = setup_tcp_socket(host, iport, socktype)
    599604        sockets.append(socket)
    600605        add_mdns(socktype, host, iport)
    601 
     606    def add_udp_socket(socktype, host, iport):
     607        socket = setup_udp_socket(host, iport, socktype)
     608        if socktype=="dtls":
     609            from dtls import do_patch   #@UnresolvedImport
     610            do_patch()
     611        sockets.append(socket)
     612        add_mdns(socktype, host, iport)
    602613    # Initialize the TCP sockets before the display,
    603614    # That way, errors won't make us kill the Xvfb
    604615    # (which may not be ours to kill at that point)
    605     from xpra.server.socket_util import setup_tcp_socket, setup_vsock_socket, setup_local_sockets
    606616    netlog("setting up SSL sockets: %s", bind_ssl)
    607617    for host, iport in bind_ssl:
    608618        add_tcp_socket("SSL", host, iport)
     
    615625        add_tcp_socket("tcp", host, iport)
    616626        if tcp_ssl:
    617627            add_mdns("ssl", host, iport)
     628    netlog("setting up UDP sockets: %s", bind_udp)
     629    for host, iport in bind_udp:
     630        add_udp_socket("udp", host, iport)
     631    netlog("setting up UDP+DTLS sockets: %s", bind_dtls)
     632    for host, iport in bind_dtls:
     633        add_udp_socket("dtls", host, iport)
    618634    netlog("setting up http / ws (websockets): %s", bind_ws)
    619635    for host, iport in bind_ws:
    620636        add_tcp_socket("ws", host, iport)
     
    908924            mdns_publish(display_name, mode, listen_on, mdns_info)
    909925
    910926    try:
    911         app._ssl_wrap_socket = wrap_socket_fn
     927        app._ssl_wrap_server_socket = wrap_server_socket_fn
     928        app._ssl_wrap_client_socket = wrap_client_socket_fn
    912929        app.original_desktop_display = desktop_display
    913930        app.exec_cwd = opts.chdir or cwd
    914931        app.init(opts)
  • xpra/server/server_core.py

     
    146146        self.ws_auth_class = None
    147147        self.wss_auth_class = None
    148148        self.ssl_auth_class = None
     149        self.udp_auth_class = None
     150        self.dtls_auth_class = None
    149151        self.rfb_auth_class = None
    150152        self.vsock_auth_class = None
    151153        self._when_ready = []
     
    157159        #networking bits:
    158160        self._socket_info = []
    159161        self._potential_protocols = []
     162        self._udp_listeners = []
     163        self._udp_protocols = {}
    160164        self._tcp_proxy_clients = []
    161165        self._tcp_proxy = ""
    162         self._ssl_wrap_socket = None
     166        self._ssl_wrap_server_socket = None
     167        self._ssl_wrap_client_socket = None
    163168        self._accept_timeout = SOCKET_TIMEOUT + 1
    164169        self.ssl_mode = None
    165170        self._html = False
     
    538543        self.do_cleanup()
    539544        self.cleanup_protocols(protocols, reason, True)
    540545        self._potential_protocols = []
     546        self.cleanup_udp_listeners()
    541547
    542548    def do_cleanup(self):
    543549        #allow just a bit of time for the protocol packet flush
     
    544550        sleep(0.1)
    545551
    546552
     553    def cleanup_udp_listeners(self):
     554        for udpl in self._udp_listeners:
     555            udpl.close()
     556        self._udp_listeners = []
     557
    547558    def cleanup_all_protocols(self, reason):
    548559        protocols = self.get_all_protocols()
    549560        self.cleanup_protocols(protocols, reason)
     
    569580            #named pipe listener uses a thread:
    570581            sock.new_connection_cb = self._new_connection
    571582            sock.start()
     583        elif socktype in ("udp", "dtls"):
     584            #socket_info = self.socket_info.get(sock)
     585            from xpra.net.udp_protocol import UDPListener
     586            udpl = UDPListener(self, sock, socktype, self.process_udp_packet)
     587            udpl.start()
     588            self._udp_listeners.append(udpl)
    572589        else:
    573590            from xpra.gtk_common.gobject_compat import import_glib
    574591            glib = import_glib()
     
    673690        peek_data, line1 = self.peek_connection(conn)
    674691
    675692        def ssl_wrap():
    676             ssl_sock = self._ssl_wrap_socket(sock)
     693            ssl_sock = self._ssl_wrap_server_socket(sock)
    677694            netlog("ssl wrapped socket(%s)=%s", sock, ssl_sock)
    678695            if ssl_sock is None:
    679696                #None means EOF! (we don't want to import ssl bits here)
     
    729746            self.handle_rfb_connection(conn)
    730747            return
    731748
    732         elif socktype=="tcp" and peek_data and (self._html or self._tcp_proxy or self._ssl_wrap_socket):
     749        elif peek_data and (
     750            (socktype=="tcp" and  (self._html or self._tcp_proxy or self._ssl_wrap_server_socket)) or
     751            (socktype=="udp" and self._ssl_wrap_server_socket and self._ssl_wrap_client_socket)
     752            ):
    733753            #see if the packet data is actually xpra or something else
    734754            #that we need to handle via a tcp proxy, ssl wrapper or the websockify adapter:
    735755            try:
     
    763783        netlog("make_protocol(%s, %s)", socktype, conn)
    764784        socktype = socktype.lower()
    765785        protocol = protocol_class(conn)
     786        protocol.socket_type = socktype
    766787        self._potential_protocols.append(protocol)
    767788        protocol.challenge_sent = False
    768789        protocol.authenticator = None
    769790        protocol.encryption = None
    770791        protocol.keyfile = None
     792        protocol.auth_class = {
     793            "tcp"           : self.tcp_auth_class,
     794            "ssl"           : self.ssl_auth_class,
     795            "udp"           : self.udp_auth_class,
     796            "dtls"          : self.dtls_auth_class,
     797            "ws"            : self.ws_auth_class,
     798            "wss"           : self.wss_auth_class,
     799            "rfb"           : self.rfb_auth_class,
     800            "vsock"         : self.vsock_auth_class,
     801            "unix-domain"   : self.auth_class,
     802            "named-pipe"    : self.auth_class,
     803            }[socktype]
    771804        if socktype=="tcp":
    772             protocol.auth_class = self.tcp_auth_class
     805            #special case for legacy encryption code:
    773806            protocol.encryption = self.tcp_encryption
    774807            protocol.keyfile = self.tcp_encryption_keyfile
    775808            if protocol.encryption and ENCRYPT_FIRST_PACKET:
     
    776809                authlog("encryption=%s, keyfile=%s", protocol.encryption, protocol.keyfile)
    777810                password = self.get_encryption_key(None, protocol.keyfile)
    778811                protocol.set_cipher_in(protocol.encryption, DEFAULT_IV, password, DEFAULT_SALT, DEFAULT_ITERATIONS, INITIAL_PADDING)
    779         elif socktype=="ssl":
    780             protocol.auth_class = self.ssl_auth_class
    781         elif socktype=="ws":
    782             protocol.auth_class = self.ws_auth_class
    783         elif socktype=="wss":
    784             protocol.auth_class = self.wss_auth_class
    785         elif socktype=="rfb":
    786             protocol.auth_class = self.rfb_auth_class
    787         elif socktype=="vsock":
    788             protocol.auth_class = self.vsock_auth_class
    789         else:
    790             protocol.auth_class = self.auth_class
    791         protocol.socket_type = socktype
    792812        protocol.invalid_header = self.invalid_header
    793813        authlog("socktype=%s, auth class=%s, encryption=%s, keyfile=%s", socktype, protocol.auth_class, protocol.encryption, protocol.keyfile)
    794814        protocol.start()
     
    811831            #xpra packet header, no need to wrap this connection
    812832            return True, conn, peek_data
    813833        frominfo = pretty_socket(conn.remote)
    814         if self._ssl_wrap_socket and peek_data[0] in (chr(0x16), 0x16):
    815             socktype = "SSL"
     834        if self._ssl_wrap_server_socket and peek_data[0] in (chr(0x16), 0x16):
     835            socktype = {
     836                "udp"   : "dtls",
     837                "tcp"   : "SSL",
     838                }[socktype]
    816839            sock, sockname, address, target = conn._socket, conn.local, conn.remote, conn.target
    817             sock = self._ssl_wrap_socket(sock)
     840            sock = self._ssl_wrap_server_socket(sock)
    818841            if sock is None:
    819842                #None means EOF! (we don't want to import ssl bits here)
    820843                netlog("ignoring SSL EOF error")
     
    850873        return True, conn, peek_data
    851874
    852875    def invalid_header(self, proto, data, msg=""):
    853         netlog("invalid_header(%s, %s bytes: '%s', %s) input_packetcount=%s, tcp_proxy=%s, html=%s, ssl=%s", proto, len(data or ""), msg, repr_ellipsized(data), proto.input_packetcount, self._tcp_proxy, self._html, bool(self._ssl_wrap_socket))
     876        netlog("invalid_header(%s, %s bytes: '%s', %s) input_packetcount=%s, tcp_proxy=%s, html=%s, ssl=%s",
     877               proto, len(data or ""), msg, repr_ellipsized(data), proto.input_packetcount, self._tcp_proxy, self._html, bool(self._ssl_wrap_server_socket))
    854878        err = "invalid packet format, %s" % self.guess_header_protocol(data)
    855879        proto.gibberish(err, data)
    856880
     
    14441468        for socktype, auth_class in {
    14451469                                     "tcp"          : self.tcp_auth_class,
    14461470                                     "ssl"          : self.ssl_auth_class,
     1471                                     "ws"           : self.ws_auth_class,
     1472                                     "wss"          : self.wss_auth_class,
     1473                                     "udp"          : self.udp_auth_class,
     1474                                     "dtls"         : self.dtls_auth_class,
     1475                                     "rfb"          : self.rfb_auth_class,
    14471476                                     "unix-domain"  : self.auth_class,
    14481477                                     "vsock"        : self.vsock_auth_class,
    14491478                                     }.items():
     
    14741503    def handle_rfb_connection(self, conn):
    14751504        log.error("Error: RFB protocol is not supported by this server")
    14761505        conn.close()
     1506
     1507    def process_udp_packet(self, udp_listener, uuid, seqno, packetsize, dataoffset, datalen, data, bfrom):
     1508        log.info("process_udp_packet%s", (udp_listener, uuid, seqno, packetsize, dataoffset, datalen, len(data), bfrom))
     1509        protocol = self._udp_protocols.get(uuid)
     1510        if not protocol:
     1511            from xpra.net.udp_protocol import UDPServerProtocol, UDPSocketConnection
     1512            def udp_protocol_class(conn):
     1513                protocol = UDPServerProtocol(self, conn, self.process_packet)
     1514                protocol.large_packets.append("info-response")
     1515                protocol.receive_aliases.update(self._aliases)
     1516                return protocol
     1517            socktype = udp_listener._socktype
     1518            host, port = bfrom
     1519            sock = udp_listener._socket
     1520            if socktype=="dtls":
     1521                from dtls import do_patch   #@UnresolvedImport
     1522                do_patch()
     1523                wrap_socket = self._ssl_wrap_client_socket(sock)
     1524                sock = wrap_socket(sock)
     1525                assert sock, "failed to wrap socket %s" % sock
     1526            sockname = sock.getsockname()
     1527            conn = UDPSocketConnection(sock, sockname, (host, port), (host, port), socktype)
     1528            conn.timeout = SOCKET_TIMEOUT
     1529            protocol = self.do_make_protocol(socktype, conn, udp_protocol_class)
     1530            self._udp_protocols[uuid] = protocol
     1531        assert packetsize==datalen
     1532        assert len(data)==datalen
     1533        protocol._read_queue_put(data)
  • xpra/server/socket_util.py

     
    108108        log("create_tcp_socket%s", (host, iport), exc_info=True)
    109109        raise InitException("failed to setup %s socket on %s:%s %s" % (socktype, host, iport, e))
    110110    def cleanup_tcp_socket():
    111         log.info("closing %s socket %s:%s", socktype, host, iport)
     111        log.info("closing %s socket %s:%s", socktype.lower(), host, iport)
    112112        try:
    113113            tcp_socket.close()
    114114        except:
     
    117117    log("%s: %s:%s : %s", socktype, host, iport, socket)
    118118    return socktype, tcp_socket, (host, iport)
    119119
     120def create_udp_socket(host, iport):
     121    if host.find(":")<0:
     122        listener = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
     123        sockaddr = (host, iport)
     124    else:
     125        assert socket.has_ipv6, "specified an IPv6 address but this is not supported"
     126        res = socket.getaddrinfo(host, iport, socket.AF_INET6, socket.SOCK_DGRAM)
     127        listener = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
     128        sockaddr = res[0][-1]
     129    listener.bind(sockaddr)
     130    return listener
    120131
    121 def parse_bind_tcp(bind_tcp):
    122     tcp_sockets = set()
    123     if bind_tcp:
    124         for spec in bind_tcp:
     132def setup_udp_socket(host, iport, socktype="udp"):
     133    from xpra.log import Logger
     134    log = Logger("network")
     135    try:
     136        udp_socket = create_udp_socket(host, iport)
     137    except Exception as e:
     138        log("create_udp_socket%s", (host, iport), exc_info=True)
     139        raise InitException("failed to setup %s socket on %s:%s %s" % (socktype, host, iport, e))
     140    def cleanup_udp_socket():
     141        log.info("closing %s socket %s:%s", socktype, host, iport)
     142        try:
     143            udp_socket.close()
     144        except:
     145            pass
     146    add_cleanup(cleanup_udp_socket)
     147    log("%s: %s:%s : %s", socktype, host, iport, socket)
     148    return socktype, udp_socket, (host, iport)
     149
     150
     151def parse_bind_ip(bind_ip):
     152    ip_sockets = set()
     153    if bind_ip:
     154        for spec in bind_ip:
    125155            if ":" not in spec:
    126                 raise InitException("TCP port must be specified as [HOST]:PORT")
     156                raise InitException("port must be specified as [HOST]:PORT")
    127157            host, port = spec.rsplit(":", 1)
    128158            if host == "":
    129159                host = "127.0.0.1"
     
    135165                    assert iport>0 and iport<2**16
    136166                except:
    137167                    raise InitException("invalid port number: %s" % port)
    138             tcp_sockets.add((host, iport))
    139     return tcp_sockets
     168            ip_sockets.add((host, iport))
     169    return ip_sockets
    140170
    141171def setup_vsock_socket(cid, iport):
    142172    from xpra.log import Logger