xpra icon
Bug tracker and wiki

This bug tracker and wiki are being discontinued
please use https://github.com/Xpra-org/xpra instead.


Ticket #1486: xpra_lb.py

File xpra_lb.py, 7.3 KB (added by Denis01, 5 years ago)
Line 
1# This file is part of Xpra.
2# Copyright (C) 2013-2017 Antoine Martin <antoine@devloop.org.uk>
3# Xpra is released under the terms of the GNU GPL v2, or, at your option, any
4# later version. See the file COPYING for details.
5
6import os
7import sqlite3
8import paramiko # yum -y install epel-release # yum repolist # yum -y install python-pip #pip install --upgrade pip #sudo yum install gcc libffi-devel python-devel openssl-devel #pip install paramiko
9import re
10import sys
11import socket
12
13
14from xpra.log import Logger
15log = Logger("proxy")
16authlog = Logger("proxy", "auth")
17
18
19def find_port(db_name, user_id):
20        conn = sqlite3.connect(db_name)
21        cursor = conn.cursor()
22        # busy_ports - table with sessions opened on host servers
23        try:
24                cursor.execute("delete from busy_ports") 
25        except sqlite3.DatabaseError as err: 
26                log.error("LB. Prb with table-busy_ports:'%s'", err)
27                return
28        else:
29                conn.commit()
30       
31        # Choose admin server to ping for port status on target_hosts
32        try:
33                cursor.execute("select * from hosts where adminserv='y'")
34                host_admin_data=cursor.fetchone()
35        except sqlite3.DatabaseError as err: 
36                log.error("LB. Prb with table-hosts:'%s'", err)
37                return
38        else:
39                conn.commit()
40        # connection data for Admin host
41        host_adm = host_admin_data[0]
42        host_user = host_admin_data[3]
43        host_secret = host_admin_data[4]
44        host_port = int(host_admin_data[6])
45       
46        # get available target_hosts and ports' slots to check ports
47        cursor.execute("select * from hosts where nodserver='y'")
48        hosts=cursor.fetchall()
49        conn.commit()
50        if not hosts:
51                log.error("LB. No server as target host defined")
52                return
53        # connect to admin server to listen to ports on targets
54        client = paramiko.SSHClient()
55        client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
56        client.connect(hostname=host_adm, username=host_user, password=host_secret, port=host_port)
57       
58        # define the actual load factor (active connections to each target_hosts)
59        hosts_load=[]
60        for row in hosts:
61                ip_serv=row[0]
62                port_start = row[1]
63                port_stop=row[2]
64
65                # Busy port check (= session available on this port)
66                comm_string='nmap '+ip_serv+' -p '+str(port_start)+'-'+str(port_stop)
67                stdin, stdout, stderr = client.exec_command(comm_string)
68                data = stdout.read() + stderr.read()
69               
70                # results text parsing - xxxx/tcp ports
71                a=[]
72                res=re.finditer('(\d+)/tcp',data)
73                for match in res :
74                        b=re.findall('(\d+)/tcp',match.group())
75                        a.append(str(b[0]))
76
77                if a: #write out of sessions opened
78                        sql = "INSERT INTO busy_ports (ip_host, port) VALUES (?,?)"
79                        cursor.executemany(sql, ((ip_serv, i) for i in a))
80                        conn.commit()
81
82                # drop all closed sessions on host (in session table still user connected but physically port on nod is close=> no session)
83                try:
84                        cursor.execute("delete from open_sessions where user in (select user from open_sessions left join busy_ports on open_sessions.ip_host=busy_ports.ip_host and open_sessions.port=busy_ports.port where busy_ports.port is NULL)")
85                except sqlite3.DatabaseError as err: 
86                        log.error("LB. Prb with tables:'%s'", err)
87                        return
88                else:
89                        conn.commit()
90                # check the user is trying to connect. And physically port is open/busy and session record with the same user ID
91                try:
92                        cursor.execute("select * from open_sessions where user=(?)", ([user_id]))
93                        conn_alive=cursor.fetchone()
94                except sqlite3.DatabaseError as err: 
95                        log.error("LB. Prb with tables:'%s'", err)
96                        return
97                else:
98                        conn.commit()
99                       
100                if conn_alive:
101                        log("LB. User already connected")
102
103                        return str(conn_alive[0]), str(conn_alive[1])
104                                               
105        client.close() # close SSH connection to admin server
106       
107        # server choice by choosing the servers/nods with lowest number of open/busy ports
108        cursor.execute("select ip_host from busy_ports group by ip_host order by count(*) ASC")
109        data1=cursor.fetchone()
110        conn.commit()
111        if data1: # server with min number of closed ports
112                ip_serv_1=data1[0]
113
114        if not data1: # all servers are free and we will take the first one
115                cursor.execute("select host_ip from hosts where nodserver='y' order by host_ip ASC")
116                data1=cursor.fetchone()
117                conn.commit()
118                ip_serv_1=data1[0]
119
120        log("LB. Host selected with lowest connection sessions number:'%s'", ip_serv_1) 
121       
122        # port choice on the selected server
123        cursor.execute("select * from hosts where host_ip=(?)", ([ip_serv_1]))
124        ports_check=cursor.fetchone()
125        conn.commit()
126        port_start = ports_check[1]
127        port_stop=ports_check[2]
128        p_l=int(port_stop)+1-int(port_start)
129        a_p = [str(int(port_start)+i) for i in range(0,p_l)]
130       
131        # temp table
132        cursor.executescript("""
133                create table avail_ports(
134                        port
135                );
136                """)
137               
138        sql = "INSERT INTO avail_ports (port) VALUES (?)"
139        cursor.executemany(sql, (([i]) for i in a_p))
140        conn.commit()
141
142        sql="select min(port) from avail_ports where port not in (select port from busy_ports where ip_host=(?))"
143        cursor.execute(sql, ([ip_serv_1]))
144        port_to_connect=cursor.fetchone()
145        port_to_connect=str(port_to_connect[0]) # from unicode to string
146        conn.commit()
147        log("LB. Port selected:'%s'", port_to_connect)
148       
149        cursor.execute("drop table avail_ports")
150        conn.commit()
151
152        return str(ip_serv_1), port_to_connect
153
154
155def create_session(db_name, user_id, ip_server, port):
156
157        if os.path.exists(db_name): 
158                log("LB. DB - OK: '%s'", db_name)
159        else: 
160                log.error("LB. No database: '%s'", db_name)
161                return 
162        # check user exists in DB
163        conn = sqlite3.connect(db_name)
164        cursor = conn.cursor()
165        try:
166                cursor.execute("select * from users where user_ipn=(?)", ([user_id]))
167                usr_created=cursor.fetchone()
168        except sqlite3.DatabaseError as err:       
169                log.error("LB. DB problem:'%s'", err)
170                return
171        else:
172                conn.commit()
173
174        if not usr_created:
175                log.error("LB. No user known:'%s'", [user_id])
176                return 
177       
178        # if connection session is not already created
179        try:
180                cursor.execute("select * from open_sessions where user=(?)", ([user_id]))
181                session_created=cursor.fetchone()
182        except sqlite3.DatabaseError as err:       
183                log.error("LB. DB problem:'%s'", err)
184                return
185        else:
186                conn.commit()
187                if session_created:
188                        return str(session_created[2]), str(session_created[0]), str(session_created[1])
189
190
191        # get login and pass to server to connect user under user's rights
192        cursor.execute("select * from users where user_ipn=(?)", [user_id])
193        conn_data=cursor.fetchone()
194        conn.commit()
195
196        host_password=conn_data[2]
197        user_conn_str=str(conn_data[1])       #+" "+str(conn_data[8])+" "+str(conn_data[9])
198       
199        comm_string="xpra start --bind-tcp=0.0.0.0:"+port+" "+str(user_conn_str)
200       
201        log("LB. Start command %s", comm_string)       
202       
203        #find ssh connection port on target_host
204        try:
205                cursor.execute("select adminport from hosts where host_ip=(?)", ([ip_server]))
206                target_host_port=cursor.fetchone()
207        except sqlite3.DatabaseError as err:       
208                log.error("LB. DB problem:'%s'", err)
209                return
210        else:
211                target_ssh_port=int(target_host_port[0])
212                conn.commit()
213               
214        client = paramiko.SSHClient()
215        client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
216        client.connect(hostname=ip_server, username=user_id, password=host_password, port=target_ssh_port)
217        stdin, stdout, stderr = client.exec_command(comm_string)
218        data = stdout.read() + stderr.read()
219        client.close()
220
221
222        #write open session info
223        sql = "INSERT INTO open_sessions (ip_host, port, user) VALUES (?,?,?)"
224        cursor.execute(sql, (ip_server, port, user_id))
225        conn.commit()
226        cursor.close()
227        conn.close
228        log("Session for '%s' on '%s':'%s' is created", user_id, ip_server, port)
229
230        return user_id, ip_server, port
231
232
233