samedi 18 juillet 2015

Celery - Querying Sqlite DB during task

I have a Python based flask app where I am using the Celery task queue to handle a set of e-mail tasks. I would like the Celery task to be able to query a sqlite database that I have tied into the whole app to pull in and use certain data but I keep getting this following error. If I pull out the one line in the celery task that queries my SQLite database, the task then executes without throwing this error so my assumption is that I am making a fundamental error regarding tying celery and my database together.

[2015-07-18 21:36:25,168: ERROR/MainProcess] Process 'Worker-1' pid:6657 exited with 'signal 11 (SIGSEGV)'
[2015-07-18 21:36:25,187: ERROR/MainProcess] Task app.views.send_mail_task[4c3d5b1a-5ac3-4ab8-b633-b925eba5dd02] raised unexpected: WorkerLostError('Worker exited prematurely: signal 11 (SIGSEGV).',)
Traceback (most recent call last):
  File "/Users/wyssuser/Desktop/new_danish/venv/lib/python2.7/site-packages/billiard/pool.py", line 1171, in mark_as_worker_lost
    human_status(exitcode)),
WorkerLostError: Worker exited prematurely: signal 11 (SIGSEGV).

The relevant code is as follows:

models.py

from datetime import datetime
from flask.ext.bcrypt import generate_password_hash
from flask.ext.login import UserMixin
from peewee import *

import config


DATABASE = SqliteDatabase('danish.db')


class Company(Model):
  dont_contact_anymore = BooleanField(default=False)
  company_name = CharField()
  website = CharField(unique=True)
  email_address = CharField()
  country = CharField()
  scraped_on = DateTimeField(formats="%m-%d-%Y")
  have_contacted = BooleanField(default=False)
  current_pipeline_phase = IntegerField(default=0)


  sector = CharField()
  contacted_by = ForeignKeyField(
    rel_model=User,
    related_name='contacted_by',
    db_column='contacted_by'
    )

  class Meta:
    database = DATABASE
    order_by = ('have_contacted',)

  @classmethod
  def create_company(cls, company_name, website, email_address):
    try:
      with DATABASE.transaction():
        cls.create(company_name=company_name, website=website, email_address=email_address, scraped_on=datetime.now)
        print 'Saved {}'.format(company_name)
    except IntegrityError:
      print '{} already exists in the database'.format(company_name)



def initialize():
  DATABASE.connect()
  DATABASE.create_tables([Company, User, Post],safe=True)
  DATABASE.close()

celery.js

function start_the_magic() {
  //Get the ids of the companies to contact and remove
  var ids = get_the_ids();

  var contact = ids[0]
  var remove = ids[1]

  console.log(contact);
  console.log(remove);

  $.ajax({
      type: 'POST',
      url: '/email_task',
      data: JSON.stringify({contact: contact, remove: remove}),
      contentType: "application/json; charset=utf-8",
      dataType: 'json',
      success: function(data, status, request) {
          status_url = request.getResponseHeader('Location');
          update_progress(status_url, '#progress div');
      },
      error: function() {
          alert('Unexpected error');
      }
  });
}


function update_progress(status_url, status_div) {
  // send GET request to status URL
  $.getJSON(status_url, function(data) {
      // update UI
      percent = parseInt(data['current'] * 100 / data['total']);
      //Remove the table and install the progress bar
      $('div.progress-container').html("");
      $('div.progress-container').html(bootstrap_progress);
      $(status_div).text(percent + '%');
      if (data['state'] != 'PENDING' && data['state'] != 'PROGRESS') {
          if ('result' in data) {
              // show result
              console.log('update progress worked!');
              // $(status_div.childNodes[3]).text('Result: ' + data['result']);
          }
          else {
              // something unexpected happened
              console.log('something unexpected happened');
              // $(status_div.childNodes[3]).text('Result: ' + data['state']);
          }
      }
      else {
          // rerun in 2 seconds
          setTimeout(function() {
              update_progress(status_url, status_div);
          }, 2000);
      }
  });
}

view.py

from flask import render_template, flash, redirect, session, url_for, request, g, jsonify
from flask.ext.login import login_user, logout_user, current_user, login_required, LoginManager
from flask.ext.bcrypt import check_password_hash
from config import DATABASE_QUERY_TIMEOUT
from datetime import datetime
from app import app, db, lm, celery
from .forms import GetClientsForm, LoginForm, CompanyForm, UpdateCompanyForm
from .models import User, Post, DoesNotExist, Company

try: 
  import simplejson as json
except:
  import json

import time
import random


@app.route('/email_task', methods=['POST'])
def email_task():
  #get the unique IDs for the companies to contact and remove
  data = json.loads(request.data)
  contact = data['contact']
  remove = data['remove']

  print contact
  print remove

  task = send_mail_task.apply_async(args=[contact, remove])
  return jsonify({}), 202, {'Location': url_for('taskstatus',
                                                  task_id=task.id)}


@app.route('/status/<task_id>')
def taskstatus(task_id):
    task = send_mail_task.AsyncResult(task_id)

    if task.state == 'PENDING':
        response = {
            'state': task.state,
            'current': 0,
            'total': 1,
            'status': 'Pending...'
        }
    elif task.state != 'FAILURE':
        response = {
            'state': task.state,
            'current': task.info.get('current', 0),
            'total': task.info.get('total', 1),
            'status': task.info.get('status', '')
        }
        if 'result' in task.info:
            response['result'] = task.info['result']
    else:
        # something went wrong in the background job
        response = {
            'state': task.state,
            'current': 1,
            'total': 1,
            'status': str(task.info),  # this is the exception raised
        }

    return jsonify(response)


@celery.task(base=DbTask, bind=True)
def send_mail_task(self, contact, remove):
    total_contact_companies = len(contact);
    verb = ['Starting up', 'Booting', 'Repairing', 'Loading', 'Checking']
    adjective = ['master', 'radiant', 'silent', 'harmonic', 'fast']
    noun = ['solar array', 'particle reshaper', 'cosmic ray', 'orbiter', 'bit']
    message = ''

    for num,i in enumerate(contact):
      company = Company.select().where(Company.id == i).get()
      print 'Contacting ' + company
      message = 'E-mailing {} at {}...'.format(company.company_name, company.email_address)

      self.update_state(state='PROGRESS',
                          meta={'current': num, 'total': total_contact_companies,
                                'status': message})
      time.sleep(3)
    return {'current': 100, 'total': 100, 'status': 'Task completed!',
            'result': 42}

Aucun commentaire:

Enregistrer un commentaire