# REDFLAG: modified need to push changes back into main repo """ This module contains classes for scheduling and running large numbers of FCP requests. Copyright (C) 2008 Darrell Karbott This library is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2.0 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this library; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA Author: djk@isFiaD04zgAgnrEC5XJt1i4IE7AkNPqhBG5bONi6Yks """ import time from fcpconnection import MinimalClient class QueueableRequest(MinimalClient): """ A request which can be queued in a RequestQueue and run by a RequestRunner. """ def __init__(self, queue): MinimalClient.__init__(self) self.queue = queue self.message_callback = None # set by RequestRunner # The time after which this request should be canceled. self.cancel_time_secs = None # RequestQueue.next_request() MUST set this class RequestRunner: """ Class to run requests scheduled on one or more RequestQueues. """ def __init__(self, connection, concurrent): self.connection = connection self.concurrent = concurrent # request id -> client self.running = {} self.request_queues = [] self.index = 0 def add_queue(self, request_queue): """ Add a queue to the scheduler. """ if not request_queue in self.request_queues: self.request_queues.append(request_queue) def remove_queue(self, request_queue): """ Remove a queue from the scheduler. """ if request_queue in self.request_queues: self.request_queues.remove(request_queue) def cancel_request(self, client): """ Cancel a request. This is asynchronous. """ #print "CLIENT: ", client, type(client) if type(client) == type(1): raise Exception("Hack added to find bug: REDFLAG") self.connection.remove_request(client.request_id()) # REDFLAG: BUG: fix to set cancel time in the past. # fix kick to check cancel time before starting? def kick(self): """ Run the scheduler state machine. You MUST call this frequently. """ if self.connection.is_uploading(): # REDFLAG: Test this code path! #print "kick -- bailed out, still UPLOADING..." # Wait for upload to finish. return # Cancel running requests which have timed out. now = time.time() for client in self.running.values(): assert client.cancel_time_secs if client.cancel_time_secs < now: self.connection.remove_request(client.request_id()) # REDFLAG: test this code with multiple queues!!! # Round robin schedule requests from queues idle_queues = 0 # Catch before uninsightful /0 error on the next line. assert len(self.request_queues) > 0 self.index = self.index % len(self.request_queues) # Paranoid start_index = self.index while (len(self.running) < self.concurrent and idle_queues < len(self.request_queues) and not self.connection.is_uploading()): #print "IDLE_QUEUES:", idle_queues if self.index == start_index: idle_queues = 0 client = self.request_queues[self.index].next_runnable() #print "CLIENT:", client if client: #print "client.queue: ", client.queue #print "running: ", client #if 'URI' in client.in_params.fcp_params: # print " ", client.in_params.fcp_params['URI'] assert client.queue == self.request_queues[self.index] client.in_params.async = True client.message_callback = self.msg_callback self.running[self.connection.start_request(client)] \ = client else: idle_queues += 1 self.index = (self.index + 1) % len(self.request_queues) def msg_callback(self, client, msg): """ Route incoming FCP messages to the appropriate queues. """ if client.is_finished(): client.queue.request_done(client, msg) #print "RUNNING:" #print self.running del self.running[client.request_id()] self.kick() # haha else: client.queue.request_progress(client, msg) class RequestQueue: """ Abstract base class for request queues. """ def __init__(self, runner): self.runner = runner def next_runnable(self): """ Return a MinimalClient instance for the next request to be run or None if none is available. """ pass def request_progress(self, client, msg): """ Handle non-terminal FCP messages for running requests. """ pass def request_done(self, client, msg): """ Handle terminal FCP messages for running requests. """ pass