NAME POE::Component::PreforkDispatch - Preforking task dispatcher DESCRIPTION Applications that require lots of asynchronous tasks going at once may suffer a performance hit from repeating the fork/die process over and over again with each enqueued job. Similar to how Apache forks, this dispatcher will maintain a pool of available forks and a queue of pending tasks. Each task (request) will be handled in turn, and will return to the callback when done. SYNOPSIS use POE qw(Component::PreforkDispatch); POE::Session->create( inline_states => { _start => \&start, do_slow_task => \&task, do_slow_task_cb => \&task_cb, }, ); $poe_kernel->run(); sub start { POE::Component::PreforkDispatch->create( max_forks => 4, pre_fork => 2, ); foreach (1..5) { print "Enqueued request $_\n"; $poe_kernel->post(PreforkDispatch => 'new_request', { method => 'do_slow_task', upon_result => 'do_slow_task_cb', params => [ 'a value', $_, ], }); } } sub task { my ($kernel, $heap, $from, $param1, $param2) = @_[KERNEL, HEAP, ARG0 .. $#_]; # ... do something slow print STDERR "Task running with '$param1', '$param2'\n"; sleep 10; # Return hashref or arrayref return { success => 1 }; } sub task_cb { my ($kernel, $heap, $request, $result) = @_[KERNEL, HEAP, ARG0, ARG1]; print STDERR "Task with param ".$request->{params}[1]." returned " .($result->{success} ? 'successful' : 'failure')."\n"; } USAGE Methods Constructor Call ->create() like with any other "POE::Session", passing a list of named parameters. * methods => \%methods * classes => \@classes * xmlrpc_server_parent => $session_name Provide an optional means of finding a method to dispatch a request to. If none are provided, the request itself needs to indicate it's method. methods => { 'do_something' => \&do_something, 'do_else' => 'do_else_state', }, Methods will be searched for by name and will call either the state or the subroutine. See below for how either is called. classes => [ 'My::Class' ], Methods will be attempted in each namespace provided, and called as subroutines. xmlrpc_server_parent => 'XMLRPC_Session_Alias', Requests will be wrapped in a pseudo-transaction capable of being passed onto a POE::Component::Server::XMLRPC session for handling. * upon_result => $subref || $state_name If provided, used as a fallback result function to send completed requests to. * max_forks => $num Number of forks, max, to spawn off to handle requests. * pre_fork => $num How many forks to start out with. The rest are spawned as needed, with a 2 sec delay between new forks. * max_requests => $num How many requests each fork can handle before being slayed and respawned (if necessary). * verbose => $num (defaults 0) * talkback => sub { } The dispatcher logs certain events, and can be verbose about it. The talkback function will be passed a single arg of a log line. This defaults to printing to STDOUT. * fork_name => $name In process lists on a POSIX system, you can change the name of the forked children so you can at a glance know that they're dispatcher forks and not the parent process. Will be renamed to "$name child". * alias => $session_name Provide a session name. Defaults to 'PreforkDispatch'. Session States new_request (\%param) The primary interface to enqueueing requests. Takes the following arguments in a hashref. $poe_kernel->post( PreforkDispatch => 'new_request', { method_name => 'do_something', params => [ 'Computer 3' ], }); * method_name Provide a method name for searching for an appropriate method to dispatch to. Most akin to XMLRPC's method_name. * method => $subref || $session_state Instead of using the method_name, you can provide the method session state or subref to use as a request handler. * upon_result => $subref || $session_state Instead of using the global upon_result, provide a per-request callback. * params => $arrayref An arrayref, this is where you put your payload of the request. * from An XMLRPC value, this is not used typically for a single-host application. Request to response After a new_request() is issued, the dispatcher will process it in a FIFO queue, using forks if available, or handling it synchronously otherwise. Handling a request is done by searching for a valid method, either picking the $request->{method}, or if not available, searching the dispatcher methods, classes and finally the xmlrpc_server_parent for something to handle $request->{method_name}. If the method given is a subref, it will be passed ($from, @args). If a POE session state name, the calling session will have this state posted to with the same args ($from, @args): my ($from, @args) = @_; or my ($kernel, $heap, $from, @args) = @_[KERNEL, HEAP, ARG0 .. $#_]; Once the request is handled, successfully or not, a response is sent to either the request's 'upon_result', or the dispatchers. If the method is a subref, it will be handed ($request, $response). Similar for session state. The request will be the same as passed, but with the additional key/value of 'elapsed' containing the seconds the request took to process. The response will be the response value of the method that handled the request, or in the case of an error, a hashref with the key 'error'. Special methods There are some methods that are special and can be used to control child fork behavior * _precall * _postcall Not sure if these are useful, but will be called before and after the named method. Can be used as universal constructor/destructors for method calls. Passed the main method params. * _fork_preinit * _fork_postinit Code to be called before and after actually forking (in the parent process). * _fork_init Not passed anything, this permits the fork to do something that's better done after forking (opening handles and such). SEE ALSO POE, POE::Component::Pool::Thread, POE::Component::JobQueue TODO * Class-based method discovery * More tests AUTHOR Eric Waters COPYRIGHT Copyright (c) 2007 Eric Waters and XMission LLC (http://www.xmission.com/). All rights reserved. This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself. The full text of the license can be found in the LICENSE file included with this module.