1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
|
import socket
from http.client import HTTPConnection
def websocket_request(
remote_agent_host, remote_agent_port, host=None, origin=None, path="/session"
):
real_host = f"{remote_agent_host}:{remote_agent_port}"
url = f"http://{real_host}{path}"
conn = HTTPConnection(real_host)
skip_host = host is not None
conn.putrequest("GET", url, skip_host)
if host is not None:
conn.putheader("Host", host)
if origin is not None:
conn.putheader("Origin", origin)
conn.putheader("Connection", "upgrade")
conn.putheader("Upgrade", "websocket")
conn.putheader("Sec-WebSocket-Key", "dGhlIHNhbXBsZSBub25jZQ==")
conn.putheader("Sec-WebSocket-Version", "13")
conn.endheaders()
return conn.getresponse()
def http_request(server_host, server_port, path="/status", host=None, origin=None):
url = f"http://{server_host}:{server_port}{path}"
conn = HTTPConnection(server_host, server_port)
custom_host = host is not None
conn.putrequest("GET", url, skip_host=custom_host)
if custom_host:
conn.putheader("Host", host)
if origin is not None:
conn.putheader("Origin", origin)
conn.endheaders()
return conn.getresponse()
def get_free_port():
"""Get a random unbound port"""
max_attempts = 10
err = None
for _ in range(max_attempts):
s = socket.socket()
try:
s.bind(("127.0.0.1", 0))
except OSError as e:
err = e
continue
else:
return s.getsockname()[1]
finally:
s.close()
if err is None:
err = Exception("Failed to get a free port")
raise err
def get_host(port_type, hostname, server_port):
if port_type == "default_port":
return hostname
if port_type == "server_port":
return f"{hostname}:{server_port}"
if port_type == "wrong_port":
wrong_port = int(server_port) + 1
return f"{hostname}:{wrong_port}"
raise Exception(f"Unrecognised port_type {port_type}")
|