## event-loop.rb --- high-level IO multiplexer
# Copyright (C) 2005 Daniel Brockman
# This program is free software; you can redistribute it
# and/or modify it under the terms of the GNU General Public
# License as published by the Free Software Foundation;
# either version 2 of the License, or (at your option) any
# later version.
# This file is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty
# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License for more details.
# You should have received a copy of the GNU General Public
# License along with this program; if not, write to the Free
# Software Foundation, 51 Franklin Street, Fifth Floor,
# Boston, MA 02110-1301, USA.
require "puppet/external/event-loop/better-definers"
require "puppet/external/event-loop/signal-system"
require "fcntl"
class EventLoop
include SignalEmitter
IO_STATES = [:readable, :writable, :exceptional]
class << self
def default ; @default ||= new end
def default= x ; @default = x end
def current
Thread.current["event-loop::current"] || default end
def current= x
Thread.current["event-loop::current"] = x end
def with_current (new)
if current == new
yield
else
begin
old = self.current
self.current = new
yield
ensure
self.current = old
end
end
end
def method_missing (name, *args, &block)
if current.respond_to? name
current.__send__(name, *args, &block)
else
super
end
end
end
define_signals :before_sleep, :after_sleep
def initialize
@running = false
@awake = false
@wakeup_time = nil
@timers = []
@io_arrays = [[], [], []]
@ios = Hash.new do |h, k| raise ArgumentError,
"invalid IO event: #{k}", caller(2) end
IO_STATES.each_with_index { |x, i| @ios[x] = @io_arrays[i] }
@notify_src, @notify_snk = IO.pipe
# prevent file descriptor leaks
@notify_src.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC)
@notify_snk.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC)
@notify_src.will_block = false
@notify_snk.will_block = false
# Each time a byte is sent through the notification pipe
# we need to read it, or IO.select will keep returning.
monitor_io(@notify_src, :readable)
@notify_src.extend(Watchable)
@notify_src.on_readable do
begin
@notify_src.sysread(256)
rescue Errno::EAGAIN
# The pipe wasn't readable after all.
end
end
end
define_opposite_accessors \
:stopped? => :running?,
:sleeping? => :awake?
def run
if block_given?
thread = Thread.new { run }
yield ; quit ; thread.join
else
running!
iterate while running?
end
ensure
quit
end
def iterate (user_timeout=nil)
t1, t2 = user_timeout, max_timeout
timeout = t1 && t2 ? [t1, t2].min : t1 || t2
select(timeout).zip(IO_STATES) do |ios, state|
ios.each { |x| x.signal(state) } if ios
end
end
private
def select (timeout)
@wakeup_time = timeout ? Time.now + timeout : nil
# puts "waiting: #{timeout} seconds"
signal :before_sleep ; sleeping!
IO.select(*@io_arrays + [timeout]) || []
ensure
awake! ; signal :after_sleep
@timers.each { |x| x.sound_alarm if x.ready? }
end
public
def quit ; stopped! ; wake_up ; self end
def monitoring_io? (io, event)
@ios[event].include? io end
def monitoring_timer? (timer)
@timers.include? timer end
def monitor_io (io, *events)
for event in events do
unless monitoring_io?(io, event)
@ios[event] << io ; wake_up
end
end
end
def monitor_timer (timer)
unless monitoring_timer? timer
@timers << timer
end
end
def check_timer (timer)
wake_up if timer.end_time < @wakeup_time
end
def ignore_io (io, *events)
events = IO_STATES if events.empty?
for event in events do
wake_up if @ios[event].delete(io)
end
end
def ignore_timer (timer)
# Don't need to wake up for this.
@timers.delete(timer)
end
def max_timeout
return nil if @timers.empty?
[@timers.collect { |x| x.time_left }.min, 0].max
end
def wake_up
@notify_snk.write('.') if sleeping?
end
end
class Symbol
def io_state?
EventLoop::IO_STATES.include? self
end
end
module EventLoop::Watchable
include SignalEmitter
define_signals :readable, :writable, :exceptional
def monitor_events (*events)
EventLoop.monitor_io(self, *events) end
def ignore_events (*events)
EventLoop.ignore_io(self, *events) end
define_soft_aliases \
:monitor_event => :monitor_events,
:ignore_event => :ignore_events
def close ; super
ignore_events end
def close_read ; super
ignore_event :readable end
def close_write ; super
ignore_event :writable end
module Automatic
include EventLoop::Watchable
def add_signal_handler (name, &handler) super
monitor_event(name) if name.io_state?
end
def remove_signal_handler (name, handler) super
if @signal_handlers[name].empty?
ignore_event(name) if name.io_state?
end
end
end
end
class IO
def on_readable &block
extend EventLoop::Watchable::Automatic
on_readable(&block)
end
def on_writable &block
extend EventLoop::Watchable::Automatic
on_writable(&block)
end
def on_exceptional &block
extend EventLoop::Watchable::Automatic
on_exceptional(&block)
end
def will_block?
require "fcntl"
fcntl(Fcntl::F_GETFL, 0) & Fcntl::O_NONBLOCK == 0
end
def will_block= (wants_blocking)
require "fcntl"
flags = fcntl(Fcntl::F_GETFL, 0)
if wants_blocking
flags &= ~Fcntl::O_NONBLOCK
else
flags |= Fcntl::O_NONBLOCK
end
fcntl(Fcntl::F_SETFL, flags)
end
end
class EventLoop::Timer
include SignalEmitter
DEFAULT_INTERVAL = 0.0
DEFAULT_TOLERANCE = 0.001
def initialize (options={}, &handler)
@running = false
@start_time = nil
if options.kind_of? Numeric
options = { :interval => options }
end
if options[:interval]
@interval = options[:interval].to_f
else
@interval = DEFAULT_INTERVAL
end
if options[:tolerance]
@tolerance = options[:tolerance].to_f
elsif DEFAULT_TOLERANCE < @interval
@tolerance = DEFAULT_TOLERANCE
else
@tolerance = 0.0
end
@event_loop = options[:event_loop] || EventLoop.current
if block_given?
add_signal_handler(:alarm, &handler)
start unless options[:start?] == false
else
start if options[:start?]
end
end
define_readers :interval, :tolerance
define_signal :alarm
def stopped? ; @start_time == nil end
def running? ; @start_time != nil end
def interval= (new_interval)
old_interval = @interval
@interval = new_interval
if new_interval < old_interval
@event_loop.check_timer(self)
end
end
def end_time
@start_time + @interval end
def time_left
end_time - Time.now end
def ready?
time_left <= @tolerance end
def restart
@start_time = Time.now
end
def sound_alarm
signal :alarm
restart if running?
end
def start
@start_time = Time.now
@event_loop.monitor_timer(self)
end
def stop
@start_time = nil
@event_loop.ignore_timer(self)
end
end
if __FILE__ == $0
require "test/unit"
class TimerTest < Test::Unit::TestCase
def setup
@timer = EventLoop::Timer.new(:interval => 0.001)
end
def test_timer
@timer.on_alarm do
puts "[#{@timer.time_left} seconds left after alarm]"
EventLoop.quit
end
8.times do
t0 = Time.now
@timer.start ; EventLoop.run
t1 = Time.now
assert(t1 - t0 > @timer.interval - @timer.tolerance)
end
end
end
end
## event-loop.rb ends here.
syntax highlighted by Code2HTML, v. 0.9.1