-
Notifications
You must be signed in to change notification settings - Fork 0
/
tasks.py
82 lines (68 loc) · 2.5 KB
/
tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# -*- coding: utf-8 -*-
from datetime import datetime, timedelta
import logging
import pickle
import codecs
import random
import google.auth
from google.appengine.ext import ndb
from flask import Blueprint, request
from progress import TaskProgress
#from dbcontext import dbcontext
#from memcache_wrapper import memcache
tasks = Blueprint('tasks', __name__, template_folder='templates')
"""
def create_task(func, **kwargs):
logging.info("starting create_task")
tasks_client = tasks_v2.CloudTasksClient()
http_method=tasks_v2.HttpMethod.POST
_, PROJECT_ID = google.auth.default()
REGION_ID = 'europe-west1'
QUEUE_NAME = 'default'
QUEUE_PATH = tasks_client.queue_path(PROJECT_ID, REGION_ID, QUEUE_NAME)
# Initialize request argument(s)
request = tasks_v2.CreateTaskRequest(
parent=QUEUE_PATH,
)
client = tasks_v2.CloudTasksClient()
logging.info("get queue for task")
build = client.get_queue(request={'name': "default"})
#for q in tasks_client.list_queues(parent=QUEUE_PATH):
# print(q.name)
task_nonce = codecs.encode(random.randbytes(33), "base64").decode()
expires = datetime.utcnow() + timedelta(hours=1)
memcache.set('task:'+task_nonce, '', expires)
task = {
'app_engine_http_request': {
'relative_uri': '/tasks/handler/',
"http_method": tasks_v2.HttpMethod.POST,
'body': {'_func_name': func.__name__,
'args': codecs.encode(pickle.dumps(kwargs), "base64").decode(),
'nonce': task_nonce,
},
'headers': {
'Content-Type:': 'application/json',
},
}
}
logging.info("creating task")
response = tasks_client.create_task(request=request, task=task)
@tasks.route('/handler/', methods=['POST'])
@dbcontext
def task_handler():
logging.info("task handler")
request_json = request.get_json()
method_name = request_json.get('_func_name')
task_nonce = request_json.get('nonce')
if memcache.get('task:'+task_nonce) is None:
return 'task cookie missing', 200 # return 200 to remove the task from the queue
memcache.remove('task:'+task_nonce)
kwargs = pickle.loads(codecs.decode(request_json.get('args').encode(), "base64"))
possibles = globals().copy()
possibles.update(locals())
method = possibles.get(method_name)
if not method:
raise NotImplementedError("Method %s not implemented" % method_name)
method(**kwargs)
return "", 200
"""