I have a Python based flask app where I am using the Celery task queue to handle a set of e-mail tasks. I would like the Celery task to be able to query a sqlite database that I have tied into the whole app to pull in and use certain data but I keep getting this following error. If I pull out the one line in the celery task that queries my SQLite database, the task then executes without throwing this error so my assumption is that I am making a fundamental error regarding tying celery and my database together.
[2015-07-18 21:36:25,168: ERROR/MainProcess] Process 'Worker-1' pid:6657 exited with 'signal 11 (SIGSEGV)'
[2015-07-18 21:36:25,187: ERROR/MainProcess] Task app.views.send_mail_task[4c3d5b1a-5ac3-4ab8-b633-b925eba5dd02] raised unexpected: WorkerLostError('Worker exited prematurely: signal 11 (SIGSEGV).',)
Traceback (most recent call last):
File "/Users/wyssuser/Desktop/new_danish/venv/lib/python2.7/site-packages/billiard/pool.py", line 1171, in mark_as_worker_lost
human_status(exitcode)),
WorkerLostError: Worker exited prematurely: signal 11 (SIGSEGV).
The relevant code is as follows:
models.py
from datetime import datetime
from flask.ext.bcrypt import generate_password_hash
from flask.ext.login import UserMixin
from peewee import *
import config
DATABASE = SqliteDatabase('danish.db')
class Company(Model):
dont_contact_anymore = BooleanField(default=False)
company_name = CharField()
website = CharField(unique=True)
email_address = CharField()
country = CharField()
scraped_on = DateTimeField(formats="%m-%d-%Y")
have_contacted = BooleanField(default=False)
current_pipeline_phase = IntegerField(default=0)
sector = CharField()
contacted_by = ForeignKeyField(
rel_model=User,
related_name='contacted_by',
db_column='contacted_by'
)
class Meta:
database = DATABASE
order_by = ('have_contacted',)
@classmethod
def create_company(cls, company_name, website, email_address):
try:
with DATABASE.transaction():
cls.create(company_name=company_name, website=website, email_address=email_address, scraped_on=datetime.now)
print 'Saved {}'.format(company_name)
except IntegrityError:
print '{} already exists in the database'.format(company_name)
def initialize():
DATABASE.connect()
DATABASE.create_tables([Company, User, Post],safe=True)
DATABASE.close()
celery.js
function start_the_magic() {
//Get the ids of the companies to contact and remove
var ids = get_the_ids();
var contact = ids[0]
var remove = ids[1]
console.log(contact);
console.log(remove);
$.ajax({
type: 'POST',
url: '/email_task',
data: JSON.stringify({contact: contact, remove: remove}),
contentType: "application/json; charset=utf-8",
dataType: 'json',
success: function(data, status, request) {
status_url = request.getResponseHeader('Location');
update_progress(status_url, '#progress div');
},
error: function() {
alert('Unexpected error');
}
});
}
function update_progress(status_url, status_div) {
// send GET request to status URL
$.getJSON(status_url, function(data) {
// update UI
percent = parseInt(data['current'] * 100 / data['total']);
//Remove the table and install the progress bar
$('div.progress-container').html("");
$('div.progress-container').html(bootstrap_progress);
$(status_div).text(percent + '%');
if (data['state'] != 'PENDING' && data['state'] != 'PROGRESS') {
if ('result' in data) {
// show result
console.log('update progress worked!');
// $(status_div.childNodes[3]).text('Result: ' + data['result']);
}
else {
// something unexpected happened
console.log('something unexpected happened');
// $(status_div.childNodes[3]).text('Result: ' + data['state']);
}
}
else {
// rerun in 2 seconds
setTimeout(function() {
update_progress(status_url, status_div);
}, 2000);
}
});
}
view.py
from flask import render_template, flash, redirect, session, url_for, request, g, jsonify
from flask.ext.login import login_user, logout_user, current_user, login_required, LoginManager
from flask.ext.bcrypt import check_password_hash
from config import DATABASE_QUERY_TIMEOUT
from datetime import datetime
from app import app, db, lm, celery
from .forms import GetClientsForm, LoginForm, CompanyForm, UpdateCompanyForm
from .models import User, Post, DoesNotExist, Company
try:
import simplejson as json
except:
import json
import time
import random
@app.route('/email_task', methods=['POST'])
def email_task():
#get the unique IDs for the companies to contact and remove
data = json.loads(request.data)
contact = data['contact']
remove = data['remove']
print contact
print remove
task = send_mail_task.apply_async(args=[contact, remove])
return jsonify({}), 202, {'Location': url_for('taskstatus',
task_id=task.id)}
@app.route('/status/<task_id>')
def taskstatus(task_id):
task = send_mail_task.AsyncResult(task_id)
if task.state == 'PENDING':
response = {
'state': task.state,
'current': 0,
'total': 1,
'status': 'Pending...'
}
elif task.state != 'FAILURE':
response = {
'state': task.state,
'current': task.info.get('current', 0),
'total': task.info.get('total', 1),
'status': task.info.get('status', '')
}
if 'result' in task.info:
response['result'] = task.info['result']
else:
# something went wrong in the background job
response = {
'state': task.state,
'current': 1,
'total': 1,
'status': str(task.info), # this is the exception raised
}
return jsonify(response)
@celery.task(base=DbTask, bind=True)
def send_mail_task(self, contact, remove):
total_contact_companies = len(contact);
verb = ['Starting up', 'Booting', 'Repairing', 'Loading', 'Checking']
adjective = ['master', 'radiant', 'silent', 'harmonic', 'fast']
noun = ['solar array', 'particle reshaper', 'cosmic ray', 'orbiter', 'bit']
message = ''
for num,i in enumerate(contact):
company = Company.select().where(Company.id == i).get()
print 'Contacting ' + company
message = 'E-mailing {} at {}...'.format(company.company_name, company.email_address)
self.update_state(state='PROGRESS',
meta={'current': num, 'total': total_contact_companies,
'status': message})
time.sleep(3)
return {'current': 100, 'total': 100, 'status': 'Task completed!',
'result': 42}
Aucun commentaire:
Enregistrer un commentaire