xpra icon
Bug tracker and wiki

This bug tracker and wiki are being discontinued
please use https://github.com/Xpra-org/xpra instead.


Ticket #838: fix-steal-connection-race.patch

File fix-steal-connection-race.patch, 5.1 KB (added by Antoine Martin, 6 years ago)

a possible fix - not pretty

  • xpra/net/protocol.py

     
    104104            self._process_packet_cb = process_packet_cb
    105105        self._write_queue = Queue(1)
    106106        self._read_queue = Queue(20)
     107        self._read_queue_put = self._read_queue.put
    107108        # Invariant: if .source is None, then _source_has_more == False
    108109        self._get_packet_cb = get_packet_cb
    109110        #counters:
     
    165166        exited = True
    166167        for t in (self._read_thread, self._write_thread):
    167168            if t.isAlive():
     169                log.warn("%s thread of %s has not yet exited", t.name, self._conn)
    168170                exited = False
    169171                break
    170172        return exited
     
    549551    def _read(self):
    550552        buf = self._conn.read(READ_BUFFER_SIZE)
    551553        #log("read thread: got data of size %s: %s", len(buf), repr_ellipsized(buf))
    552         self._read_queue.put(buf)
     554        #add to the read queue (or whatever takes its place - see steal_connection)
     555        self._read_queue_put(buf)
    553556        if not buf:
    554557            log("read thread: eof")
    555558            self.close()
     
    871874        self.terminate_queue_threads()
    872875        self.idle_add(self.clean)
    873876
    874     def steal_connection(self):
     877    def steal_connection(self, read_callback):
    875878        #so we can re-use this connection somewhere else
    876879        #(frees all protocol threads and resources)
     880        #the read_callback may get called with whatever buffer was being read
    877881        assert not self._closed
     882        self._read_queue_put = read_callback
    878883        conn = self._conn
    879884        self._closed = True
    880885        self._conn = None
     886        if conn:
     887            conn.set_active(False)
    881888        self.terminate_queue_threads()
    882889        return conn
    883890
  • xpra/server/server_core.py

     
    2929from xpra.scripts.server import deadly_signal
    3030from xpra.net.bytestreams import SocketConnection
    3131from xpra.platform import set_application_name
    32 from xpra.os_util import load_binary_file, get_machine_id, get_user_uuid, SIGNAMES
     32from xpra.os_util import load_binary_file, get_machine_id, get_user_uuid, SIGNAMES, Queue
    3333from xpra.version_util import version_compat_check, get_version_info, get_platform_info, get_host_info, local_version
    3434from xpra.net.protocol import Protocol, get_network_caps, sanity_checks
    3535from xpra.net.crypto import new_cipher_caps
     
    365365        sockname = sock.getsockname()
    366366        target = peername or sockname
    367367        sock.settimeout(self._socket_timeout)
    368         netlog("new_connection(%s) sock=%s, timeout=%s, sockname=%s, address=%s, peername=%s", args, sock, self._socket_timeout, sockname, address, peername)
     368        netlog.info("new_connection(%s) sock=%s, timeout=%s, sockname=%s, address=%s, peername=%s", args, sock, self._socket_timeout, sockname, address, peername)
    369369        sc = SocketConnection(sock, sockname, address, target, socktype)
    370370        netlog("socket connection: %s", sc)
    371371        frominfo = ""
     
    403403        proxylog("start_tcp_proxy(%s, '%s')", proto, repr_ellipsized(data))
    404404        self._potential_protocols.remove(proto)
    405405        proxylog("start_tcp_proxy: protocol state before stealing: %s", proto.get_info(alias_info=False))
    406         client_connection = proto.steal_connection()
     406        #any buffers read after we steal the connection will be placed in this queue:
     407        temp_read_buffer = Queue()
     408        client_connection = proto.steal_connection(temp_read_buffer.put)
     409
    407410        #connect to web server:
    408411        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    409412        sock.settimeout(10)
     
    415418            proto.gibberish("invalid packet header", data)
    416419            return
    417420        proxylog("proxy connected to tcp server at %s:%s : %s", host, port, web_server_connection)
    418         sock.settimeout(0.5)
    419         self.set_socket_timeout(client_connection, 0.5)
     421        sock.settimeout(self._socket_timeout)       #could be 0 / None?
     422        self.set_socket_timeout(client_connection, self._socket_timeout)
     423
     424        ioe = proto.wait_for_io_threads_exit(0.5+self._socket_timeout)
     425        if not ioe:
     426            proxylog.warn("proxy failed to stop all existing network threads!")
     427
     428        #now we own it, we can start it again:
     429        client_connection.set_active(True)
     430        proxylog("pushing initial buffer to its new destination: %s", repr_ellipsized(data))
    420431        web_server_connection.write(data)
     432        while not temp_read_buffer.empty():
     433            buf = temp_read_buffer.get()
     434            if buf:
     435                proxylog("pushing read buffer to its new destination: %s", repr_ellipsized(buf))
     436                web_server_connection.write(buf)
    421437        p = XpraProxy(client_connection.target, client_connection, web_server_connection)
    422438        self._tcp_proxy_clients.append(p)
    423439        proxylog.info("client connection from %s forwarded to proxy server on %s:%s", client_connection.target, host, port)