forked from cockpit-project/bots
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrun-queue
executable file
·125 lines (106 loc) · 4.75 KB
/
run-queue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
#!/usr/bin/env python3
# This file is part of Cockpit.
#
# Copyright (C) 2018 Red Hat, Inc.
#
# Cockpit is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
#
# Cockpit is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Cockpit; If not, see <http://www.gnu.org/licenses/>.
MAX_PRIORITY = 9
import argparse
import json
import random
import subprocess
import sys
import time
import logging
import pipes
from task import redhat_network, distributed_queue
logging.basicConfig(level=logging.INFO)
# Returns a command to execute and the delivery tag needed to ack the message
def consume_webhook_queue(channel, amqp):
# interpret payload
# call tests-scan or issue-scan appropriately
method_frame, header_frame, body = channel.basic_get(queue='webhook')
if not method_frame:
return None, None
body = json.loads(body)
event = body['event']
request = body['request']
repo = None
cmd = None
if event == 'pull_request':
pull_request = request['pull_request']
action = request['action']
# scan for body changes (edited) and the bots label
if action == 'labeled' or (action == 'edited' and 'body' in request.get('changes', {})):
cmd = ['bots/issue-scan', '--issues-data', json.dumps(request), '--amqp', amqp]
if request['action'] in ['opened', 'synchronize']:
repo = pull_request['base']['repo']['full_name']
cmd = ['bots/tests-scan', '--pull-data', json.dumps(request), '--amqp', amqp, '--repo', repo]
elif event == 'status':
repo = request['repository']['full_name']
sha = request['sha']
context = pipes.quote(request['context'])
cmd = ['bots/tests-scan', '--sha', sha, '--amqp', amqp, '--context', context, '--repo', repo]
elif event == 'issues':
action = request['action']
# scan for opened, body changes (edited), and the bots label
if action in ['opened', 'labeled'] or (action == 'edited' and 'body' in request.get('changes', {})):
cmd = ['bots/issue-scan', '--issues-data', json.dumps(request), '--amqp', amqp]
else:
logging.error('Unkown event type in the webhook queue')
return None, None
return cmd, method_frame.delivery_tag
# Returns a command to execute and the delivery tag needed to ack the message
def consume_task_queue(channel, amqp, declare_public_result, declare_rhel_result):
queue='public'
if redhat_network():
# Try the rhel queue if the public queue is empty
if declare_public_result.method.message_count == 0:
queue = 'rhel'
# If both are non-empty, shuffle
elif declare_rhel_result.method.message_count > 0:
queue = ['public', 'rhel'][random.randrange(2)]
method_frame, header_frame, body = channel.basic_get(queue=queue)
if not method_frame:
return None, None
body = json.loads(body)
return body['command'], method_frame.delivery_tag
# Consume from the webhook queue and republish to the task queue via *-scan
# Consume from the task queue (endpoint)
def main():
parser = argparse.ArgumentParser(description='Bot: read a single test command from the queue and execute it')
parser.add_argument('--amqp', default='localhost:5671',
help='The host:port of the AMQP server to consume from (default: %(default)s)')
opts = parser.parse_args()
with distributed_queue.DistributedQueue(opts.amqp, ['webhook', 'rhel', 'public']) as q:
channel = q.channel
cmd, delivery_tag = consume_webhook_queue(channel, opts.amqp)
if not cmd and delivery_tag:
logging.info("Webhook message interpretation resulted in nop")
channel.basic_ack(delivery_tag)
return 0
if not cmd:
cmd, delivery_tag = consume_task_queue(channel, opts.amqp, q.declare_results['public'], q.declare_results['rhel'])
if not cmd:
logging.info("All queues are empty")
return 1
logging.info("Consuming message with command: {0}\n".format(' '.join(cmd) if type(cmd) is list else cmd))
p = subprocess.Popen(cmd, shell=type(cmd) is str)
while p.poll() is None:
q.connection.process_data_events()
time.sleep(3)
channel.basic_ack(delivery_tag)
return 0
if __name__ == '__main__':
sys.exit(main())