xpra icon
Bug tracker and wiki

Ticket #474: multiplex-web.patch

File multiplex-web.patch, 10.0 KB (added by Antoine Martin, 5 years ago)

PoC patch which does the multiplexing for http get requests only in "ServerBase?"

  • xpra/net/protocol.py

     
    628628        self.close()
    629629        return False
    630630
    631     def _gibberish(self, msg, data):
     631    def gibberish(self, msg, data):
    632632        self.scheduler.idle_add(self._process_packet_cb, self, [Protocol.GIBBERISH, data])
    633633        # Then hang up:
    634634        self.scheduler.timeout_add(1000, self._connection_lost, msg)
    635635
     636    def _invalid_header(self, data):
     637        self.invalid_header(self, data)
    636638
     639    def invalid_header(self, proto, data):
     640        err = "invalid packet header byte: '%s'" % hex(ord(data[0]))
     641        if len(data)>1:
     642            err += " read buffer=0x%s" % repr_ellipsized(data)
     643        self.gibberish(err, data)
     644
    637645    def _read_parse_thread_loop(self):
    638646        debug("read_parse_thread_loop starting")
    639647        try:
     
    678686                if payload_size<0:
    679687                    head = read_buffer[:8]
    680688                    if read_buffer[0] not in ("P", ord("P")):
    681                         err = "invalid packet header byte: '%s', not an xpra client?" % hex(ord(read_buffer[0]))
    682                         if len(read_buffer)>1:
    683                             err += " read buffer=0x%s" % repr_ellipsized(read_buffer)
    684                             if len(read_buffer)>40:
    685                                 err += "..."
    686                         self._gibberish(err, head)
     689                        self._invalid_header(read_buffer)
    687690                        return
    688691                    if bl<8:
    689692                        break   #packet still too small
    690693                    #packet format: struct.pack('cBBBL', ...) - 8 bytes
    691                     try:
    692                         _, protocol_flags, compression_level, packet_index, data_size = unpack_header(head)
    693                     except Exception, e:
    694                         self._gibberish("failed to parse packet header: 0x%s: %s" % (repr_ellipsized(head), e), head)
    695                         return
     694                    _, protocol_flags, compression_level, packet_index, data_size = unpack_header(head)
     695
    696696                    read_buffer = read_buffer[8:]
    697697                    bl = len(read_buffer)
    698698                    if protocol_flags & Protocol.FLAGS_CIPHER:
    699699                        if self.cipher_in_block_size==0 or not self.cipher_in_name:
    700700                            err = "received cipher block but we don't have a cipher do decrypt it with, not an xpra client?"
    701                             self._gibberish(err, head)
     701                            self.gibberish(err, head)
    702702                            return
    703703                        padding = (self.cipher_in_block_size - data_size % self.cipher_in_block_size) * " "
    704704                        payload_size = data_size + len(padding)
     
    792792                    if self._closed:
    793793                        return
    794794                    msg = "gibberish received: %s, packet index=%s, packet size=%s, buffer size=%s, error=%s" % (repr_ellipsized(data), packet_index, payload_size, bl, e)
    795                     self._gibberish(msg, data)
     795                    self.gibberish(msg, data)
    796796                    return
    797797
    798798                if self._closed:
  • xpra/server/proxy.py

     
    66
    77import threading
    88
    9 from xpra.log import Logger
     9from xpra.log import Logger, debug_if_env
    1010log = Logger()
     11debug = debug_if_env(log, "XPRA_PROXY_DEBUG")
    1112from xpra.net.bytestreams import untilConcludes
    1213
     14
    1315class XpraProxy(object):
    1416    """
    1517        This is the proxy command that runs
     
    2729        self._to_server = threading.Thread(target=self._to_server_loop)
    2830
    2931    def run(self):
     32        debug("XpraProxy.run()")
    3033        self._to_client.start()
    3134        self._to_server.start()
    3235        self._to_client.join()
    3336        self._to_server.join()
     37        debug("XpraProxy.run() ended")
    3438
    3539    def _to_client_loop(self):
    3640        self._copy_loop("<-server", self._server_conn, self._client_conn)
     
    4145        self._to_client._Thread__stop()
    4246
    4347    def _copy_loop(self, log_name, from_conn, to_conn):
     48        debug("XpraProxy._copy_loop(%s, %s, %s)", log_name, from_conn, to_conn)
    4449        try:
    4550            while not self._closed:
    46                 log("%s: waiting for data", log_name)
     51                debug("%s: waiting for data", log_name)
    4752                buf = untilConcludes(self.is_active, from_conn.read, 4096)
    4853                if not buf:
    49                     log("%s: connection lost", log_name)
     54                    debug("%s: connection lost", log_name)
    5055                    self.quit()
    5156                    return
    5257                while buf and not self._closed:
    53                     log("%s: writing %s bytes", log_name, len(buf))
     58                    debug("%s: writing %s bytes", log_name, len(buf))
    5459                    written = untilConcludes(self.is_active, to_conn.write, buf)
    5560                    buf = buf[written:]
    5661        except Exception, e:
    57             log("%s: %s", log_name, e)
     62            debug("%s: %s", log_name, e)
    5863            self.quit()
    5964
    6065    def is_active(self):
     66        debug("XpraProxy.is_active()=%s", not self._closed)
    6167        return not self._closed
    6268
    6369    def quit(self, *args):
    64         log("closing proxy connections")
     70        debug("XpraProxy.quit(%s) closing proxy connections", args)
    6571        self._closed = True
    6672        try:
    6773            self._client_conn.close()
  • xpra/server/server_base.py

     
    4242
    4343        # This must happen early, before loading in windows at least:
    4444        self._server_sources = {}
     45        self._web_clients = {}
     46        self._web_server = "192.168.42.100:8000"
     47        self._socket_timeout = 0.1
    4548
    4649        #so clients can store persistent attributes on windows:
    4750        self.client_properties = {}
     
    358361        packet_types += list(self._authenticated_ui_packet_handlers.keys())
    359362        self.do_init_aliases(packet_types)
    360363
     364    def invalid_header(self, proto, data):
     365        if proto.input_packetcount==0 and self._web_server:
     366            #look for http get:
     367            if data[:4]=="GET ":
     368                self.start_web_proxy(proto, data)
     369                return
     370        ServerCore.invalid_header(self, proto, data)
    361371
     372    def start_web_proxy(self, proto, data):
     373        log.info("start_web_proxy(%s)", proto)
     374        client_connection = proto.steal_connection()
     375        self._potential_protocols.remove(proto)
     376        from xpra.scripts.main import _socket_connect
     377        from xpra.daemon_thread import make_daemon_thread
     378        from xpra.server.proxy import XpraProxy
     379        import socket
     380        #connect to web server:
     381        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
     382        sock.settimeout(10)
     383        host, port = self._web_server.split(":", 1)
     384        web_server_connection = _socket_connect(sock, (host, int(port)), "web-proxy-for-%s" % proto, "tcp")
     385        log.info("proxy connected to web server at %s:%s : %s", host, port, web_server_connection)
     386        log.info("sending data=%s", data)
     387        web_server_connection.write(data)
     388        p = XpraProxy(client_connection, web_server_connection)
     389        self._web_clients[proto] = p
     390        t = make_daemon_thread(p.run, "web-proxy-for-%s" % proto)
     391        t.start()
     392
     393
    362394    def cleanup(self, *args):
     395        log("cleanup() stopping %s web clients: %s", len(self._web_clients), self._web_clients)
     396        for p in self._web_clients.values():
     397            p.quit()
    363398        if self.notifications_forwarder:
    364399            thread.start_new_thread(self.notifications_forwarder.release, ())
    365400            self.notifications_forwarder = None
     
    406441        return source
    407442
    408443    def verify_connection_accepted(self, protocol):
    409         if not protocol._closed and protocol in self._potential_protocols and protocol not in self._server_sources:
     444        if not protocol._closed and protocol in self._potential_protocols and \
     445            protocol not in self._server_sources and protocol not in self._web_clients:
    410446            log.error("connection timedout: %s", protocol)
    411447            self.send_disconnect(protocol, "login timeout")
    412448
  • xpra/server/server_core.py

     
    2626from xpra.net.bytestreams import SocketConnection
    2727from xpra.os_util import set_application_name, load_binary_file, get_machine_id, get_user_uuid, SIGNAMES
    2828from xpra.version_util import version_compat_check, add_version_info, get_platform_info
    29 from xpra.net.protocol import Protocol, use_lz4, use_rencode, new_cipher_caps, get_network_caps
     29from xpra.net.protocol import Protocol, use_lz4, use_rencode, new_cipher_caps, get_network_caps, repr_ellipsized
    3030from xpra.server.background_worker import stop_worker
    3131from xpra.util import typedict
    3232
     
    291291        protocol = Protocol(self, sc, self.process_packet)
    292292        protocol.large_packets.append("info-response")
    293293        protocol.authenticator = None
     294        protocol.invalid_header = self.invalid_header
    294295        self._potential_protocols.append(protocol)
    295296        protocol.start()
    296297        self.timeout_add(SOCKET_TIMEOUT*1000, self.verify_connection_accepted, protocol)
    297298        return True
    298299
     300    def invalid_header(self, proto, data):
     301        log.warn("invalid_header(%s, %s)", proto, repr_ellipsized(data))
     302        err = "invalid packet header byte: '%s', not an xpra client?" % hex(ord(data[0]))
     303        if len(data)>1:
     304            err += " read buffer=0x%s" % repr_ellipsized(data)
     305        proto.gibberish(err, data)
     306
     307    def add_protocol(self, protocol):
     308        self._potential_protocols.append(protocol)
     309
    299310    def verify_connection_accepted(self, protocol):
    300311        raise NotImplementedError()
    301312