-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconnection.py
126 lines (109 loc) · 4.38 KB
/
connection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import time
import sys
import re
try:
import paramiko
except ImportError:
print("paramiko is not installed, please use pip install paramiko")
sys.exit(-1)
class Connection:
def __init__(self, ip, username, password, port=22, retry=10) -> None:
self._linux_host = re.compile("^\[(\w|-)+@(\w|-)+ (\w|-|/|~)+\]#", re.IGNORECASE) # 匹配 linux
self._device_host = re.compile("^(\w|-)+ \(?(\w|-)*\)? ?#", re.IGNORECASE) # 匹配 网络设备
self.retry = retry
self.client = paramiko.SSHClient()
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
self.client.connect(ip, port=port, username=username, password=password)
self.shell= self.client.invoke_shell()
response = self.wait_response().decode()
# 正则捕获hostname,只能捕获网络设备
self.hostname = self.check_hostname(response.split("\n")[-1])
print("Successfully login.\nIP: {}\nHost name: {}".format(ip, self.hostname))
print("Start receiving:")
print(40 * '-')
print(response, end="")
except paramiko.AuthenticationException as e:
print(e)
self.client.close()
sys.exit(-1)
def wait_response(self):
attempt = 0
while True:
time.sleep(0.5)
if self.shell.recv_ready() or self.shell.recv_stderr_ready():
response = self.shell.recv(4096)
break
attempt += 1
if attempt >= self.retry:
response = b''
break
return response
def recv_all(self, confirm_flag=None):
output = ""
while True:
response = self.wait_response().decode()
if not response:
break
output += response
# 循环匹配是否出现标记字符,出现则发送确认字符
if confirm_flag:
confirm_word = ""
for flag in confirm_flag:
if flag in response:
confirm_word = confirm_flag.get(flag)
if not isinstance(confirm_word, bytes):
confirm_word = bytes(confirm_word, encoding="utf-8")
confirm_word += b'\n'
self.shell.send(confirm_word)
break
if confirm_word:
continue
# 匹配是否出现More,出现则发送空格字符
if "More" in response:
self.shell.send(b"\x20")
continue
return output
def send_one_command(self, command, confirm_flag=None):
if command[-1:] != '\n':
command += '\n'
command = bytes(command, encoding="utf-8")
self.shell.send(command)
stdout = self.recv_all(confirm_flag=confirm_flag)
return stdout
def send_command_file(self, command_file, confirm_flag=None):
with open(command_file, 'r', encoding="utf-8") as f:
commands = f.readlines()
for cmd in commands:
yield self.send_one_command(cmd, confirm_flag=confirm_flag)
def check_hostname(self, recv):
if self._linux_host.match(recv):
return self._linux_host.match(recv)[0].split(" ")[0].split("@")[1]
if self._device_host.match(recv):
return self._device_host.match(recv)[0].split(" ")[0]
return None
def close(self):
if self.client is not None:
self.client.close()
if __name__ == "__main__":
ip = input("IP_ADDRESS: ")
username = input("username: ")
try:
from getpass import getpass
password = getpass("password: ")
except ImportError:
print("getpass is not installed.")
password = input("password: ")
client = Connection(ip, username=username, password=password)
# 逐行读取在远程执行的命令
command_file = "doc/centos_nginx"
# 捕获额外需要输入的提示,返回对应的输入
confirm_flag = {
"'s password:": "centos",
"Enter expert password:": "centos",
"Are you sure you want to continue?(Y/N)[N]": "y"
}
# 实时返回远程输出信息
for stdout in client.send_command_file(command_file, confirm_flag=confirm_flag):
print(stdout, end="")
client.close()