Issue

最近写了一个api,会从mysql当中读取某一行的记录,并根据原始记录的数值来执行不同的操作,测试时发现在并发调用的时候出现了问题。问题可简化为:

  1. 读取mysql的数据中一个计数器字段 count:
  2. 根据 count的值进行不同的操作:
    a. 若 count == 0,执行操作A
    b. 若 count > 0,执行操作B,并使 count - 1

问题在于当两个(或多个)请求同时触发时,会发生幻读,比如两个请求在同一时间读取到 count 为1,然后它们同时都会触发操作B,并使 count - 1,最终的结果是 count变成了0,且进行了两次操作B。而我们理想中应该是先进行一次操作B,再进行一次操作A。

Solution

根据mysql的事务一致性标准不同,解决幻读问题有两种方式:

  1. 将一致性标准设为最高的 serlization
  2. 锁表中的记录

针对第二种方法,记录下我用SQLAlchemy的ORM的代码如下(用了.with_for_update()对单条记录加锁):

try:
    db.session.begin()
    # Lock the record using FOR UPDATE
    task_record = db.session.query(WorkflowTask).filter_by(workflow_id=workflow_id,
                                                           check_signature=check_signature).with_for_update().first()
    if task_record.parent_count == 1:
        logger.info('starting check point')
        res = signature.delay(workflow_id=workflow_id, start_time=args.get('start_time'),
                              end_time=args.get('end_time'))
        db.session.query(WorkflowTask).filter_by(workflow_id=workflow_id, id=task_record.id).update(
            (dict(status='STARTED', parent_count=0, task_id=res.id, is_triggered=True)))
        db.session.commit()
        return {'status': 'success', 'data': {'task_id': res.id}}, 201
    else:
        logger.info('not ready to start, mark parent_count to {}'.format(task_record.parent_count - 1))
        db.session.query(WorkflowTask).filter_by(workflow_id=workflow_id, id=task_record.id).update(
            (dict(parent_count=WorkflowTask.parent_count - 1,
                  is_triggered=True if args.get('trigger') == 1 else False)))
        db.session.commit()
        return {'status': 'success', 'data': {'task_id': None}}, 201
except Exception as e:
    logger.error(getattr(e, 'message', repr(e)), exc_info=True)
    db.session.rollback()
    return {'status': 'failed', 'message': getattr(e, 'message', repr(e))}, 500