Обычно при запуске веб-проектов на python асинхронные задачи в фоне выполняются при помощи Celery. Но в этот раз у меня на сервере был uwsgi, а разворачивать celery для одной-двух задач мне не хотелось. Поэтому решил посмотреть на uwsgidecorators
и его spool
.
Проект на сервере разработан на django, но запускать такую большую штуку только для тестирования spool
было перебором, а тут рядом была открыта страница с документацией по Falcon. Вот так у меня появился подопытный фреймворк.
Быстро написал скрипт для запуска задачи в фоне. Задача запускалась, в терминал через каждую секунду асинхронно сыпались print('hi')
. После этого стал налаживать обмен данными между обработчиком запроса и задачей. Придумал "задачу", в которой обработчик запроса должен создавать в словаре хеш от даты, а потом асинхронно считать сумму кодов символов в этом хеше.
Первый пример, который должен был работать
import hashlib import json from datetime import datetime import falcon from uwsgidecorators import spool storage = {} @spool def _task(arguments): key = arguments['key'] storage[key] = sum(ord(ch) for ch in key) class Handler(object): def on_get(self, req, resp): key = hashlib.md5(str(datetime.now()).encode('utf-8')).hexdigest() storage[key] = None _task.spool(key=key) resp.body = json.dumps({'storage': storage}) serv = falcon.API() serv.add_route('/', Handler())
Запуск сервера из командной строки
uwsgi --http :9111 --wsgi-file sample.py --callable serv --master --spooler ./spool
Но пример не работал, а выдавал в браузер:
{ "storage": { "77e609faa5ca12e6fc37273bf07f9308": null, "0cfc5e83db37eb0eedf989aa0e894f63": null, } }
Закралось подозрение, что uwsgi создаёт разные объекты для _storage. Проверил id:
@spool def _task(arguments): key = arguments['key'] _storage[key] = sum(ord(ch) for ch in key) print('=' * 80) print(id(_storage)) print(_storage) print('=' * 80) class Handler(object): def on_get(self, req, resp): key = hashlib.md5(str(datetime.now()).encode('utf-8')).hexdigest() _storage[key] = None _task.spool(key=key) resp.body = json.dumps({'storage': _storage}) print('-' * 80) print(id(_storage)) print(_storage) print('-' * 80)
ID объектов были одинаковые.
-------------------------------------------------------------------------------- 140613325993160 {'7ca17ee5c8cd055f8cebd19c0e44a393': None, '8c7e86712679f74574396e02c6af5e4e': None} -------------------------------------------------------------------------------- ================================================================================ 140613325993160 {'7ca17ee5c8cd055f8cebd19c0e44a393': 2388, '8c7e86712679f74574396e02c6af5e4e': 2130} ================================================================================
Но где-то же была ошибка. Проверил номер процесса:
@spool def _task(arguments): key = arguments['key'] print('=' * 80) print(os.getpid()) print('=' * 80) _storage[key] = sum(ord(ch) for ch in key) class Handler(object): def on_get(self, req, resp): key = hashlib.md5(str(datetime.now()).encode('utf-8')).hexdigest() _storage[key] = None _task.spool(key=key) resp.body = json.dumps({'storage': _storage}) print('-' * 80) print(os.getpid()) print('-' * 80)
А вот номера процессов отличались. Как так получилось, что совпали айдишники объектов - не знаю. Нужно посмотреть, как конкретно запускается spooler в uwsgi.
Вот так не получилось по-быстрому сделать пример обмена данными между задачей и обработчиком запроса. Тут стоит сказать, что пересылать между ними обычные объекты не получится. Но азарт заставил задуматься над способом обмена данными.
В живом проекте для этих целей, вероятнее всего, использовалась бы база данных. Здесь выдумывать что-то другое не стал, взял sqlite, к которой с давних пор отношусь с теплотой.
Для начала разделил задачи и приложение на два отдельных файла, раз уж под них идут разные процессы.
# -*- coding: utf-8 -*- import falcon import hashlib import json import sqlite3 from datetime import datetime from tasks import hashsum class Handler(object): def __init__(self): self.db = sqlite3.connect('storage.sqlite3') cur = self.db.cursor() cur.execute('CREATE TABLE IF NOT EXISTS keys (key TEXT, val INTEGER)') self.db.commit() def add_key(self, key): cur = self.db.cursor() cur.execute('INSERT INTO keys (key) values(?)', (key, )) self.db.commit() def get_data(self): data = {} cur = self.db.cursor() for key, val in cur.execute('SELECT key, val FROM keys'): data[key] = val self.db.commit() return data def on_get(self, req, resp): key = hashlib.md5(str(datetime.now()).encode('utf-8')).hexdigest() self.add_key(key) resp.body = json.dumps({'db': self.get_data()}) hashsum.spool(key=key) serv = falcon.API() serv.add_route('/', Handler())
# -*- coding: utf-8 -*- import sqlite3 from uwsgidecorators import spool @spool def hashsum(arguments): key = arguments['key'] val = sum(ord(ch) for ch in key) db = sqlite3.connect('storage.sqlite3') cur = db.cursor() cur.execute('UPDATE keys SET val=? WHERE key=?', (val, key)) db.commit() db.close()
Тут я увидел, что текущая директория для процессов разная, как результат два разных файла базы данных. Нужно было выносить настройки в отдельный файл. Так появился settings.py
import os DB_NAME = os.path.join( os.path.dirname(os.path.abspath(__file__)), 'storage.sqlite3')
# -*- coding: utf-8 -*- import sqlite3 from uwsgidecorators import spool from settings import DB_NAME @spool def hashsum(arguments): key = arguments['key'] val = sum(ord(ch) for ch in key) db = sqlite3.connect(DB_NAME) cur = db.cursor() cur.execute('UPDATE keys SET val=? WHERE key=?', (val, key)) db.commit() db.close()
# -*- coding: utf-8 -*- import falcon import hashlib import json import sqlite3 from datetime import datetime from settings import DB_NAME from tasks import hashsum class Handler(object): def __init__(self): self.db = sqlite3.connect(DB_NAME) cur = self.db.cursor() cur.execute('CREATE TABLE IF NOT EXISTS keys (key TEXT, val INTEGER)') self.db.commit() def add_key(self, key): cur = self.db.cursor() cur.execute('INSERT INTO keys (key) values(?)', (key, )) self.db.commit() def get_data(self): data = {} cur = self.db.cursor() for key, val in cur.execute('SELECT key, val FROM keys'): data[key] = val self.db.commit() return data def on_get(self, req, resp): key = hashlib.md5(str(datetime.now()).encode('utf-8')).hexdigest() self.add_key(key) resp.body = json.dumps({'db': self.get_data()}) hashsum.spool(key=key) serv = falcon.API() serv.add_route('/', Handler())
Всё заработало как надо. Результат работы в браузере:
{ "db": { "35f3fd25fc8abea97908b13854cfc813": 2300, "da90ec1480702833e9f16d6ea3d59a7a": 2243, "d70719f8adb446aa04accf8f12cd8984": null, "23a06eda818e0abae56c58498e4bb4c1": 2330 } }
Вот так получилось, что к приложению на мини-фреймворке пришлось прикручивать файл настроек, чтобы запустить задачу в фоне.
На самом деле, нужно посмотреть, как похожие задачи решают сами разработчики Falcon, потому что я пошёл по явно "джанговскому" пути. Просто очень много работаю с Django, вот и привык к таким его частям, как файл настроек.