Simple database lock for MySQL

Updated . Posted . Visible to the public.

Note: For PostgreSQL you should use advisory locks. For MySQL we still recommend the solution in this card.


If you need to synchronize multiple rails processes, you need some shared resource that can be used as a mutex. One option is to simply use your existing (MySQL) database.

The attached code provides a database-based model level mutex for MySQL. You use it by simply calling

Lock.acquire('string to synchronize on') do
  # non-threadsafe code
end

You must make sure the created locks table uses the InnoDB engine, though.

Only one process can enter the block using the same string, so a concurrent call to another Lock.acquire('string to synchronize on') will block, whereas Lock.acquire('some other string') will pass.

Usage Conditions

Lock.acquire will help when you have a piece of code that must not be executed concurrently, i.e. it should be used on model level. An example is daily newsletter delivery that must not happen twice when two users trigger "deliver daily newsletter" simultaneously.

Lock.acquire can not be a replacement for table row locks (aka record locks), i.e. it cannot prevent concurrent updates by other code that touches a record used inside Lock.aquire. An example is wrapping money transfer between accounts. While it would prevent concurrent money transfer, other code (e.g. a pay-in) could update accounts involved in a money transfer and balances would still be off.

Code

# app/models/lock.rb

class Lock < ActiveRecord::Base

  def self.acquire(name)
    already_acquired = definitely_acquired?(name)

    if already_acquired
      yield
    else
      begin
        create(:name => name) unless find_by_name(name)
      rescue ActiveRecord::StatementInvalid
        # concurrent create is okay
      end

      begin
        result = nil

        transaction do
          find_by_name(name, :lock => true) # this is the call that will block
          acquired_lock(name)
          result = yield
        end

        result
      ensure
        maybe_released_lock(name) 
      end
    end
  end

  # if true, the lock is acquired
  # if false, the lock might still be acquired, because we were in another db transaction
  def self.definitely_acquired?(name)
    !!Thread.current[:definitely_acquired_locks] and Thread.current[:definitely_acquired_locks].has_key?(name)
  end

  def self.acquired_lock(name)
    logger.debug("Acquired lock '#{name}'")
    Thread.current[:definitely_acquired_locks] ||= {}
    Thread.current[:definitely_acquired_locks][name] = true
  end

  def self.maybe_released_lock(name)
    logger.debug("Released lock '#{name}' (if we are not in a bigger transaction)")
    Thread.current[:definitely_acquired_locks] ||= {}
    Thread.current[:definitely_acquired_locks].delete(name)
  end

  private_class_method :acquired_lock, :maybe_released_lock

end
# db/migrate/???_create_locks.rb

class CreateLocks < ActiveRecord::Migration

  def self.up
    create_table :locks do |t|
      t.string :name, :limit => 40
      t.timestamps
    end
    add_index :locks, :name, :unique => true
  end

  def self.down
    remove_index :locks, :column => :name
    drop_table :locks
  end

end
# spec/models/lock_spec.rb

describe Lock, '.acquire' do

  before :each do
    @reader, @writer = IO.pipe
  end

  def fork_with_new_connection
    config = ActiveRecord::Base.remove_connection
    fork do
      begin
        ActiveRecord::Base.establish_connection(config)
        yield
      ensure
        ActiveRecord::Base.remove_connection
        Process.exit!
      end
    end
    ActiveRecord::Base.establish_connection(config)
  end

  it 'should synchronize processes on the same lock' do
    (1..20).each do |i|
      fork_with_new_connection do
        @reader.close
        ActiveRecord::Base.connection.reconnect!
        Lock.acquire('lock') do
          @writer.puts "Started: #{i}"
          sleep 0.01
          @writer.puts "Finished: #{i}"
        end
        @writer.close
      end
    end
    @writer.close

    # test whether we always get alternating "Started" / "Finished" lines
    lines = []
    @reader.each_line { |line| lines << line }
    lines.should be_present # it is empty if the processes all crashed due to a typo or similar
    lines.each_slice(2) do |start, finish|
      start.should =~ /Started: (.*)/
      start_thread = $1
      finish.should =~ /Finished: (.*)/
      finish_thread = $1
      finish_thread.should == start_thread
    end

    @reader.close
  end
end

Caveats

  • Everytime you call Lock.acquire with a new string, a row is created in a database table and left there. If you use a large number of unique strings, you might want to clean up this table regularly.
  • Lock consistency is not guaranteed in a failover event (at least on MySQL).
Tobias Kraze
Last edit
Michael Leimstädtner
Keywords
semaphore, multithreading, multithreaded, multi-threading, multi-threaded, mutex
License
Source code in this card is licensed under the MIT License.
Posted by Tobias Kraze to makandra dev (2011-02-28 12:48)