[gs-cvs] rev 8126 - in trunk/cluster: . casper tti
giles at ghostscript.com
giles at ghostscript.com
Sun Jul 15 07:04:35 PDT 2007
Author: giles
Date: 2007-07-15 07:04:35 -0700 (Sun, 15 Jul 2007)
New Revision: 8126
Added:
trunk/cluster/README
trunk/cluster/casper/
trunk/cluster/casper/dispatch.py
trunk/cluster/casper/push-new-rev.sh
trunk/cluster/casper/queue-new-rev.sh
trunk/cluster/casper/queue-rev-pcl.py
trunk/cluster/tti/
trunk/cluster/tti/ciatest.py
trunk/cluster/tti/reg_dispatch-pcl.py
trunk/cluster/tti/reg_dispatch.py
trunk/cluster/tti/regress.py
Log:
Record the current state of the cluster regression test codebase.
Added: trunk/cluster/README
===================================================================
--- trunk/cluster/README 2007-07-15 13:45:17 UTC (rev 8125)
+++ trunk/cluster/README 2007-07-15 14:04:35 UTC (rev 8126)
@@ -0,0 +1,15 @@
+These are the tools we use the do cluster-based regression testing.
+
+The 'casper' directory contains a daemon that watches a queue directory
+filled in by svn post-commit hook scripts, and copies them to the
+corresponding queue on the cluster machines.
+
+The 'tti' directory contains the daemons which launch regressions based
+on the queue files. Daemons can be launched interactively with no
+arguments, or with '-d start' and '-d stop' to run in the background,
+detatched from any terminal.
+
+This is not a live version of the source: commits here are not
+automatically picked up by the running system. This is more a
+record of major code changes.
+
Added: trunk/cluster/casper/dispatch.py
===================================================================
--- trunk/cluster/casper/dispatch.py 2007-07-15 13:45:17 UTC (rev 8125)
+++ trunk/cluster/casper/dispatch.py 2007-07-15 14:04:35 UTC (rev 8126)
@@ -0,0 +1,144 @@
+#!/usr/bin/env python
+
+import os, sys, signal
+import time
+
+sleeptime = 30
+queuedir = '/home/regression/tti/queue'
+pclqueuedir = '/home/regression/tti/queue.pcl'
+ssh_id = '/home/regression/.ssh/ttitunnel'
+ssh_dest = 'atfxsw01 at tticluster.com'
+
+class Daemon:
+ '''Class for forking off a daemon process.'''
+ def __init__(self, main):
+ self.main = main
+ self.running = False
+ self.pidfilename = "reg_dispatch.pid"
+ self.logfilename = "reg_dispatch.log"
+ def start(self, stdin=None, stdout=None, stderr=None):
+ 'Fork off a separate process, running our function.'
+
+ # make sure we're not already running
+ if os.path.exists(self.pidfilename):
+ sys.stderr.write("daemon already running.\n")
+ sys.stderr.write("stop it, or remove the stale pidfile ")
+ sys.stderr.write(self.pidfilename + "\n")
+
+ # fork from the parent process and return
+ try:
+ pid = os.fork()
+ if pid > 0:
+ # todo: grab the pid and return
+ # after the daemon is running
+ return True
+ except OSError, e:
+ sys.stderr.write("initial daemonization fork failed: (%d) %s\n" %
+ (e.errno, e.strerror))
+ return False
+
+ # Decouple ourselves from the parent environment
+ # normally we chdir("/") to avoid blocking the unmount of our
+ # launch directory, but we can't do any work without it
+ # and it makes things simpler to use the cwd
+ #os.chdir("/")
+ os.umask(0)
+ os.setsid()
+
+ # Do a second fork to avoid becoming a controlling terminal
+ try:
+ pid = os.fork()
+ if pid > 0:
+ # record the child's pid
+ pidfile = open(self.pidfilename, "w")
+ pidfile.write(str(pid) + "\n")
+ pidfile.close()
+ # exit the second parent
+ sys.exit(0)
+ except OSError, e:
+ sys.stderr.write("second daemonization fork failed: (%d) %s\n" %
+ (e.errno, e.strerror))
+ sys.exit(1)
+
+ # redirect standard file descriptors
+ if not stdin: stdin="/dev/null"
+ if not stdout: stdout = self.logfilename
+ if not stderr: stderr = stdout
+ si = file(stdin, 'r')
+ so = file(stdout, 'a')
+ se = file(stderr, 'a')
+ os.dup2(si.fileno(), sys.stdin.fileno())
+ os.dup2(so.fileno(), sys.stdout.fileno())
+ os.dup2(se.fileno(), sys.stderr.fileno())
+
+ # execute the reqested main function
+ self.main()
+
+ def stop(self):
+ 'Stop a running daemon.'
+ pidfile = open(self.pidfilename, "r")
+ pid = int(pidfile.readline())
+ os.kill(pid, signal.SIGHUP)
+
+
+def getrev(queue=queuedir):
+ revs = os.listdir(queue)
+ # ideally we'd sort by mtime, but alphabetical for now
+ revs.sort()
+ try:
+ # call basename to strip hack attempts with relative paths
+ rev = os.path.basename(revs[0])
+ except IndexError:
+ rev = None
+ return rev
+
+def mainloop():
+ doing = True
+ while True:
+ # check for ghostscript runs
+ rev = getrev(queuedir)
+ pclrev = getrev(pclqueuedir)
+ if rev:
+ doing = True
+ print 'submitting gs-r' + rev
+ cmd = 'ssh -i ' + ssh_id + ' ' + ssh_dest + ' '
+ #cmd += 'touch regression/rev.queue/' + rev
+ cmd += 'touch ' + os.path.join('regression/queue.gs/', rev)
+ os.system(cmd)
+ os.unlink(os.path.join(queuedir,rev))
+ continue
+ elif pclrev:
+ doing = True
+ rev = pclrev
+ print 'updating ghostpcl-r' + rev
+ pclrev, gsrev = rev.split('+')
+ print rev, 'splits into gs rev', gsrev, 'and pcl rev', pclrev
+ cmd = 'svn update -r ' + pclrev + ' ghostpcl'
+ os.system(cmd)
+ # svn external will fail; override with a manual checkout
+ cmd = 'svn co http://svn.ghostscript.com:8080/ghostscript/trunk/gs -r ' + gsrev + ' ghostpcl/gs'
+ os.system(cmd)
+ print 'pushing update'
+ cmd = 'rsync -avz'
+ cmd += ' --exclude ufst --exclude .svn ghostpcl/*'
+ cmd += ' ' + ssh_dest + ':regression/ghostpcl-r' + rev + '/'
+ os.system(cmd)
+ print 'submitting ghostpcl-r' + rev
+ cmd = 'ssh -i ' + ssh_id + ' ' + ssh_dest + ' '
+ cmd += 'touch ' + os.path.join('regression/queue.pcl/', rev)
+ os.system(cmd)
+ os.unlink(os.path.join(pclqueuedir,rev))
+ else:
+ if doing:
+ print '-- nothing to do --'
+ doing = False
+ time.sleep(sleeptime)
+
+
+if __name__ == '__main__':
+ daemon = Daemon(mainloop)
+ result = daemon.start()
+ if not result:
+ print "couldn't start daemon!"
+ sys.exit(1)
+
Added: trunk/cluster/casper/push-new-rev.sh
===================================================================
--- trunk/cluster/casper/push-new-rev.sh 2007-07-15 13:45:17 UTC (rev 8125)
+++ trunk/cluster/casper/push-new-rev.sh 2007-07-15 14:04:35 UTC (rev 8126)
@@ -0,0 +1,13 @@
+#!/bin/sh
+
+# run me to request a cluster regression for an svn rev.
+rev=$1
+
+SSH_ID=/home/regression/.ssh/ttitunnel
+SSH_HOST=atfxsw01 at tticluster.com
+
+ssh -i $SSH_ID $SSH_HOST touch regression/rev.queue/$1
+# todo: check for failure, this doesn't work
+if ! test $?; then
+ echo "regression request submitted to the cluster"
+fi
Property changes on: trunk/cluster/casper/push-new-rev.sh
___________________________________________________________________
Name: svn:executable
+ *
Added: trunk/cluster/casper/queue-new-rev.sh
===================================================================
--- trunk/cluster/casper/queue-new-rev.sh 2007-07-15 13:45:17 UTC (rev 8125)
+++ trunk/cluster/casper/queue-new-rev.sh 2007-07-15 14:04:35 UTC (rev 8126)
@@ -0,0 +1,7 @@
+#!/bin/sh
+
+# run me to queue a new gs regression for testing
+
+QUEUE_DIR=/home/regression/tti/queue
+
+touch $QUEUE_DIR/$1
Added: trunk/cluster/casper/queue-rev-pcl.py
===================================================================
--- trunk/cluster/casper/queue-rev-pcl.py 2007-07-15 13:45:17 UTC (rev 8125)
+++ trunk/cluster/casper/queue-rev-pcl.py 2007-07-15 14:04:35 UTC (rev 8126)
@@ -0,0 +1,18 @@
+#!/usr/bin/env python
+
+pclrepos = '/var/lib/svn-private/ghostpcl'
+gsrepos = '/var/lib/svn/ghostscript'
+queuedir = '/home/regression/tti/queue.pcl'
+
+import os
+
+def getrev(repos):
+ rev = os.popen('svnlook youngest ' + repos).readline().strip()
+ return rev
+
+def makerev():
+ pclrev = getrev(pclrepos)
+ gsrev = getrev(gsrepos)
+ return pclrev + '+' + gsrev
+
+os.system('touch ' + os.path.join(queuedir, makerev()))
Property changes on: trunk/cluster/casper/queue-rev-pcl.py
___________________________________________________________________
Name: svn:executable
+ *
Added: trunk/cluster/tti/ciatest.py
===================================================================
--- trunk/cluster/tti/ciatest.py 2007-07-15 13:45:17 UTC (rev 8125)
+++ trunk/cluster/tti/ciatest.py 2007-07-15 14:04:35 UTC (rev 8126)
@@ -0,0 +1,63 @@
+#!/usr/bin/env python
+
+# test script for submitting messages to CIA
+
+server = 'http://cia.navi.cx'
+project = 'ghostscript'
+module = 'gs'
+
+import xmlrpclib
+import time
+
+class Message:
+ def __init__(self, log='', rev=''):
+ self.log = log
+ self.rev = rev
+ self.server = server
+ self.project = project
+ self.module = module
+ self.name = 'manual test'
+ self.version = '0.1'
+ def gen(self):
+ xml = ' <generator>\n'
+ xml += ' <name>' + str(self.name) + '</name>\n'
+ xml += ' <version>' + str(self.version) + '</version>\n'
+ xml += ' </generator>\n'
+ return xml
+ def source(self):
+ xml = ' <source>\n'
+ xml += ' <project>' + str(self.project) + '</project>\n'
+ xml += ' <module>' + str(self.module) + '</module>\n'
+ xml += ' </source>\n'
+ return xml
+ def message(self):
+ xml = '<message>\n'
+ xml += self.gen()
+ xml += self.source()
+ xml += ' <body>\n'
+ xml += ' <commit>\n'
+ if self.rev:
+ xml += ' <revision>' + self.rev + '</revision>\n'
+ xml += ' <author>regression</author>\n'
+ xml += ' <log>' + str(self.log) + '</log>\n'
+ xml += ' </commit>\n'
+ xml += ' </body>\n'
+ xml += ' <timestamp>' + str(int(time.time())) + '</timestamp>\n'
+ xml += '</message>\n'
+ return xml
+ def send(self, server = None):
+ if not server: server = self.server
+ xmlrpclib.ServerProxy(server).hub.deliver(self.message())
+ def __str__(self):
+ return self.message()
+
+def irc_report(filename, rev=''):
+ file = open(filename)
+ msg = ''.join(file.readlines())
+ m = Message(msg, rev)
+ m.send()
+
+if __name__ == '__main__':
+ filename = 'regression-r7832.log'
+ print 'reporting results from \'%s\' to irc' % filename
+ irc_report(filename, '7832')
Added: trunk/cluster/tti/reg_dispatch-pcl.py
===================================================================
--- trunk/cluster/tti/reg_dispatch-pcl.py 2007-07-15 13:45:17 UTC (rev 8125)
+++ trunk/cluster/tti/reg_dispatch-pcl.py 2007-07-15 14:04:35 UTC (rev 8126)
@@ -0,0 +1,285 @@
+#/usr/bin/env python
+
+# regression test dispatch script
+# we can be used to launch a parallel regression
+# or in daemon mode to run regressions on specific revisions
+
+import os, sys, signal
+import re, time
+import ciatest
+
+class Daemon:
+ '''Class for forking off a daemon process.'''
+ def __init__(self, main):
+ self.main = main
+ self.running = False
+ self.pidfilename = "reg_dispatch.pid"
+ self.logfilename = "reg_dispatch.log"
+ def start(self, stdin=None, stdout=None, stderr=None):
+ 'Fork off a separate process, running our function.'
+
+ # make sure we're not already running
+ if os.path.exists(self.pidfilename):
+ sys.stderr.write("daemon already running.\n")
+ sys.stderr.write("stop it, or remove the stale pidfile ")
+ sys.stderr.write(self.pidfilename + "\n")
+
+ # fork from the parent process and return
+ try:
+ pid = os.fork()
+ if pid > 0:
+ # todo: grab the pid and return
+ # after the daemon is running
+ return True
+ except OSError, e:
+ sys.stderr.write("initial daemonization fork failed: (%d) %s\n" %
+ (e.errno, e.strerror))
+ return False
+
+ # Decouple ourselves from the parent environment
+ # normally we chdir("/") to avoid blocking the unmount of our
+ # launch directory, but we can't do any work without it
+ # and it makes things simpler to use the cwd
+ #os.chdir("/")
+ os.umask(0)
+ os.setsid()
+
+ # Do a second fork to avoid becoming a controlling terminal
+ try:
+ pid = os.fork()
+ if pid > 0:
+ # record the child's pid
+ pidfile = open(self.pidfilename, "w")
+ pidfile.write(str(pid) + "\n")
+ pidfile.close()
+ # exit the second parent
+ sys.exit(0)
+ except OSError, e:
+ sys.stderr.write("second daemonization fork failed: (%d) %s\n" %
+ (e.errno, e.strerror))
+ sys.exit(1)
+
+ # redirect standard file descriptors
+ if not stdin: stdin="/dev/null"
+ if not stdout: stdout = self.logfilename
+ if not stderr: stderr = stdout
+ si = file(stdin, 'r')
+ so = file(stdout, 'a')
+ se = file(stderr, 'a')
+ os.dup2(si.fileno(), sys.stdin.fileno())
+ os.dup2(so.fileno(), sys.stdout.fileno())
+ os.dup2(se.fileno(), sys.stderr.fileno())
+
+ # execute the reqested main function
+ self.main()
+
+ def stop(self):
+ 'Stop a running daemon.'
+ pidfile = open(self.pidfilename, "r")
+ pid = int(pidfile.readline())
+ if pid:
+ os.kill(pid, signal.SIGHUP)
+ else:
+ sys.stderr.write("no pidfile or daemon not running.\n")
+ os.unlink(self.pidfilename)
+
+def update(rev):
+ 'update the executable to revision <rev>'
+ workdir = "gs -r" + rev
+ svn = os.system("cd " + workdir + " && svn up -r" + rev)
+ make = os.system("cd " + workdir +" && make clean && nice make debug")
+
+def getrev(cachedir="queue.pcl"):
+ 'Read the queue directory and select a revision to test'
+ if not os.path.exists(cachedir): os.mkdir(cachedir)
+ revs = os.listdir(cachedir)
+ try:
+ rev = revs[0]
+ os.unlink(os.path.join(cachedir, rev))
+ except IndexError:
+ rev = None
+ return rev
+
+def mailfile(file, rev=None):
+ 'Mail out the report'
+ cmd = 'cat ' + file + ' '
+ cmd += '| mail -s "cluster regression'
+ if rev:
+ cmd += ' ghostpcl-r' + rev
+ cmd += ' (xefitra)" '
+ #cmd += 'giles at ghostscript.com'
+ cmd += 'gs-regression at ghostscript.com'
+ os.system(cmd)
+
+def ircfile(file, rev=None):
+ 'Notify CIA and thus irc'
+ msg = ''.join(open(file).readlines())
+ if msg:
+ try:
+ ciatest.Message(msg, rev=rev, module='ghostpcl').send()
+ except:
+ pass
+
+def choosecluster():
+ '''Decide how many nodes of which cluster to run on.
+ returns a (cluster_name, node_count) tuple.'''
+ # figure out how many nodes are free
+ r = re.compile('^\s+(?P<cluster>\w+).*\s+(?P<procs>\d+)\s+(?P<free>\d+)\s*$')
+ clusters=[]
+ nodes = 0
+ cluster = None
+ upnodes = os.popen("upnodes")
+ for line in upnodes.readlines():
+ m = r.match(line)
+ if m:
+ name = m.group("cluster")
+ procs = int(m.group("procs"))
+ free = int(m.group("free"))
+ # remember the cluster with the most free nodes
+ if free > nodes and name != 'total':
+ nodes = free
+ cluster = name
+ clusters.append((name,procs,free))
+ return (cluster, nodes)
+
+def usage(name=sys.argv[0]):
+ print "Usage: %s <revision>" % name
+ print "launch a regression run on tticluster.com"
+ print "testing gs svn rev <revision> against the default baseline"
+
+def log(msg):
+ '''print a timestamped log message. We use this for major tasks,
+ and a normal print command for progress and error messages.'''
+ print '[' + time.ctime() + '] ' + msg
+
+def pbsjob(cmd, resources=None, stdout=None, stderr=None, mpi=True):
+ if not resources:
+ cluster, nodes = choosecluster()
+ if nodes > 1 and cluster == 'red' or cluster == 'green':
+ # red reports two cpus per node
+ nodes /= 2
+ ppn = ':ppn=2'
+ else:
+ ppn = ''
+ resources = 'nodes=%d:%s:run%s,walltime=20:00' % (nodes, cluster, ppn)
+ print 'requesting', nodes, 'nodes on', cluster
+ f = open('regress.pbs', 'w')
+ f.write('#PBS -l ' + resources)
+ if stdout:
+ f.write(' -o ' + stdout)
+ if stdout == stderr:
+ f.write(' -j oe')
+ elif stderr:
+ f.write(' -e ' + stderr)
+ f.write(' -d ' + os.getcwd())
+ f.write('\n\n')
+ if mpi:
+ f.write('mpiexec -comm mpich2-pmi ')
+ f.write(' -nostdin -kill -nostdout')
+ f.write(' ')
+ f.write(cmd)
+ f.write('\n')
+ f.close()
+ os.system('qsub regress.pbs')
+
+def build(workdir=None, clean=False):
+ 'compile an executable from the current source'
+ if clean:
+ cmd = "make clean && nice ./autogen.sh && nice make"
+ else:
+ cmd = "nice make"
+ if workdir:
+ cmd = "cd " + workdir + " && " + cmd
+ if False:
+ # build on the dispatch host
+ make = os.system(cmd)
+ make = make >> 8
+ else:
+ # FIXME: alternate build on a compile node
+ report = 'update.log'
+ resources = 'nodes=1:compile'
+ cmd += "\nexit"
+ if os.path.exists(report): os.unlink(report)
+ make = pbsjob(cmd, resources, stdout=report, mpi=False)
+ while not os.path.exists(report): time.sleep(5)
+ if make:
+ log("build failed! exit code " + str(make))
+ return False
+ # update successful
+ return True
+
+
+def mainloop():
+ log("starting up")
+ doing = True
+ while True:
+ rev = getrev()
+ if rev:
+ doing = True
+ workdir = "ghostpcl-r" + rev
+ # create a working copy if necessary
+ if not os.path.exists(workdir):
+ print "couldn't find requested working copy '%s'\n" % workdir
+ continue
+ log("building " + workdir)
+ build(workdir)
+ log("build complete")
+ if not os.path.exists(os.path.join(workdir, "reg_baseline.txt")):
+ os.system("cp reg_baseline.txt " + workdir)
+ log("running regression on ghostpcl-r" + rev)
+ start = time.time()
+ report = "regression-r" + rev + ".log"
+ # remove the report if it exists since we use this to check completion
+ if os.path.exists(report): os.unlink(report)
+ f = open('regress.pbs', 'w')
+ cluster, nodes = choosecluster()
+ if nodes > 1 and (cluster == 'red' or cluster == 'green'):
+ # red reports two cpus per node
+ nodes /= 2
+ ppn = ':ppn=2'
+ else:
+ ppn = ''
+ f.write('#PBS -l nodes=%s:%s:run%s,walltime=20:00' %
+ (nodes, cluster, ppn))
+ f.write(' -o ' + report)
+ #f.write(' -j oe')
+ #f.write(' -e /dev/null')
+ f.write(' -e ' + report + '.err')
+ f.write(' -d ' + os.path.join(os.getcwd(), workdir))
+ f.write('\n\n')
+ f.write('mpiexec -comm mpich2-pmi ')
+ f.write(' -nostdin -kill -nostdout')
+ f.write(' bwpython ../regress.py')
+ f.write(' --batch --update')
+ f.write(' --exe main/obj/pcl6')
+ f.write('\n')
+ f.close()
+ print 'requesting', nodes, 'nodes on', cluster
+ os.system('qsub regress.pbs')
+ # wait for the run to finish
+ while not os.path.exists(report):
+ time.sleep(20)
+ print "report is ready as '" + report + "'. total time %d seconds" % int(time.time() - start)
+ ircfile(report, rev)
+ #mailfile(report, rev)
+ os.system("cp " + os.path.join(workdir, "reg_baseline.txt ") + " .")
+ else:
+ if doing:
+ print "-- nothing to do --"
+ sys.stdout.flush()
+ doing = False
+ time.sleep(100)
+
+if __name__ == '__main__':
+ if len(sys.argv) > 1 and sys.argv[1] == '-d':
+ daemon = Daemon(mainloop)
+ if len(sys.argv) > 2 and sys.argv[2] == 'stop':
+ result = daemon.stop()
+ else:
+ result = daemon.start()
+ if not result:
+ print "couldn't start daemon!"
+ sys.exit(1)
+ else:
+ # don't run as a daemon by default
+ mainloop()
Property changes on: trunk/cluster/tti/reg_dispatch-pcl.py
___________________________________________________________________
Name: svn:executable
+ *
Added: trunk/cluster/tti/reg_dispatch.py
===================================================================
--- trunk/cluster/tti/reg_dispatch.py 2007-07-15 13:45:17 UTC (rev 8125)
+++ trunk/cluster/tti/reg_dispatch.py 2007-07-15 14:04:35 UTC (rev 8126)
@@ -0,0 +1,272 @@
+#/usr/bin/env python
+
+# regression test dispatch script
+# we can be used to launch a parallel regression
+# or in daemon mode to run regressions on specific revisions
+
+import os, sys, signal
+import re, time
+import ciatest
+
+class Daemon:
+ '''Class for forking off a daemon process.'''
+ def __init__(self, main):
+ self.main = main
+ self.running = False
+ self.pidfilename = "reg_dispatch.pid"
+ self.logfilename = "reg_dispatch.log"
+ def start(self, stdin=None, stdout=None, stderr=None):
+ 'Fork off a separate process, running our function.'
+
+ # make sure we're not already running
+ if os.path.exists(self.pidfilename):
+ sys.stderr.write("daemon already running.\n")
+ sys.stderr.write("stop it, or remove the stale pidfile ")
+ sys.stderr.write(self.pidfilename + "\n")
+
+ # fork from the parent process and return
+ try:
+ pid = os.fork()
+ if pid > 0:
+ # todo: grab the pid and return
+ # after the daemon is running
+ return True
+ except OSError, e:
+ sys.stderr.write("initial daemonization fork failed: (%d) %s\n" %
+ (e.errno, e.strerror))
+ return False
+
+ # Decouple ourselves from the parent environment
+ # normally we chdir("/") to avoid blocking the unmount of our
+ # launch directory, but we can't do any work without it
+ # and it makes things simpler to use the cwd
+ #os.chdir("/")
+ os.umask(0)
+ os.setsid()
+
+ # Do a second fork to avoid becoming a controlling terminal
+ try:
+ pid = os.fork()
+ if pid > 0:
+ # record the child's pid
+ pidfile = open(self.pidfilename, "w")
+ pidfile.write(str(pid) + "\n")
+ pidfile.close()
+ # exit the second parent
+ sys.exit(0)
+ except OSError, e:
+ sys.stderr.write("second daemonization fork failed: (%d) %s\n" %
+ (e.errno, e.strerror))
+ sys.exit(1)
+
+ # redirect standard file descriptors
+ if not stdin: stdin="/dev/null"
+ if not stdout: stdout = self.logfilename
+ if not stderr: stderr = stdout
+ si = file(stdin, 'r')
+ so = file(stdout, 'a')
+ se = file(stderr, 'a')
+ os.dup2(si.fileno(), sys.stdin.fileno())
+ os.dup2(so.fileno(), sys.stdout.fileno())
+ os.dup2(se.fileno(), sys.stderr.fileno())
+
+ # execute the reqested main function
+ self.main()
+
+ def stop(self):
+ 'Stop a running daemon.'
+ pidfile = open(self.pidfilename, "r")
+ pid = int(pidfile.readline())
+ if pid:
+ os.kill(pid, signal.SIGHUP)
+ else:
+ sys.stderr.write("no pidfile or daemon not running.\n")
+ os.unlink(self.pidfilename)
+
+# PBS job server utilities
+
+def choosecluster():
+ '''Decide how many nodes of which cluster to run on.
+ returns a (cluster_name, node_count) tuple.'''
+ # figure out how many nodes are free
+ r = re.compile('^\s+(?P<cluster>\w+).*\s+(?P<procs>\d+)\s+(?P<free>\d+)\s*$')
+ clusters=[]
+ nodes = 0
+ upnodes = os.popen("upnodes")
+ for line in upnodes.readlines():
+ m = r.match(line)
+ if m:
+ name = m.group("cluster")
+ procs = int(m.group("procs"))
+ free = int(m.group("free"))
+ # remember the cluster with the most free nodes
+ if free > nodes and name != 'total':
+ nodes = free
+ cluster = name
+ clusters.append((name,procs,free))
+ return (cluster, nodes)
+
+def pbsjob(cmd, resources=None, stdout=None, stderr=None, mpi=True):
+ if not resources:
+ cluster, nodes = choosecluster()
+ if nodes > 1 and cluster == 'red' or cluster == 'green':
+ # red reports two cpus per node
+ nodes /= 2
+ ppn = ':ppn=2'
+ else:
+ ppn = ''
+ resources = 'nodes=%d:%s:run%s,walltime=20:00' % (nodes, cluster, ppn)
+ print 'requesting', nodes, 'nodes on', cluster
+ if stdout: jobname = stdout + '.pbs'
+ else: jobname = 'regress.pbs'
+ f = open(jobname, 'w')
+ f.write('#PBS -l ' + resources)
+ if stdout:
+ f.write(' -o ' + stdout)
+ if stdout == stderr:
+ f.write(' -j oe')
+ elif stderr:
+ f.write(' -e ' + stderr)
+ f.write(' -d ' + os.getcwd())
+ f.write('\n\n')
+ if mpi:
+ f.write('mpiexec -comm mpich2-pmi ')
+ f.write(' -nostdin -kill -nostdout')
+ f.write(' ')
+ f.write(cmd)
+ f.write('\n')
+ f.close()
+ os.system('qsub ' + jobname)
+
+
+# regression setup and reporting
+
+def update(rev):
+ 'update the source to revision <rev>'
+ svn = os.system("svn up -r" + rev)
+ if svn:
+ log("SVN update failed!")
+ return False
+ return True
+
+def build(clean=False):
+ 'compile an executable from the current source'
+ if clean:
+ cmd = "make clean && nice ./autogen.sh && nice make"
+ else:
+ cmd = "nice make"
+ if False:
+ # build on the dispatch host
+ make = os.system(cmd)
+ make = make >> 8
+ else:
+ # build on a compile node
+ resources = 'nodes=1:compile'
+ report = 'update.log'
+ if os.path.exists(report): os.unlink(report)
+ make = pbsjob(cmd, resources, stdout=report, stderr=report, mpi=False)
+ while not os.path.exists(report):
+ time.sleep(5)
+ if make:
+ log("build failed! exit code " + str(make))
+ return False
+ # update successful
+ return True
+
+def getrev(cachedir="../queue.gs"):
+ 'Read the queue directory and select a revision to test'
+ revs = os.listdir(cachedir)
+ # we would ideally sort by mtime, but for now just alphabetical
+ revs.sort()
+ try:
+ rev = revs[0]
+ os.unlink(os.path.join(cachedir, rev))
+ except IndexError:
+ rev = None
+ return rev
+
+def mailfile(file, rev=None):
+ 'Mail out the report'
+ cmd = 'cat ' + file + ' '
+ cmd += '| mail -s "cluster regression'
+ if rev:
+ cmd += ' gs-r' + rev
+ cmd += ' (xefitra)" '
+ #cmd += 'giles at ghostscript.com'
+ cmd += 'gs-regression at ghostscript.com'
+ os.system(cmd)
+
+def irclog(msg, rev=None):
+ 'Notify CIA and thus irc of a message'
+ if msg:
+ try:
+ ciatest.Message(msg, rev).send()
+ except:
+ # ignore errors, the server sometimes barfs
+ pass
+
+def ircfile(file, rev=None):
+ 'Send a result file to CIA and thus irc'
+ msg = ''.join(open(file).readlines())
+ irclog(msg, rev)
+
+def usage(name=sys.argv[0]):
+ print "Usage: %s <revision>" % name
+ print "launch a regression run on tticluster.com"
+ print "testing gs svn rev <revision> against the default baseline"
+
+def log(msg):
+ print '[' + time.ctime() + '] ' + msg
+
+def runrev(rev=None, report=None):
+ if not rev: rev = getrev()
+ if not report: report = "regression-r" + rev + ".log"
+ log("running regression on gs-r" + rev)
+ start = time.time()
+ # remove the report if it exists since we use this to check completion
+ if os.path.exists(report): os.unlink(report)
+ if not update(rev): irclog("SVN update failed!", rev)
+ elif not build(clean=True): irclog("Build failed!", rev)
+ else:
+ cmd = 'bwpython ../regress.py --batch --update'
+ pbsjob(cmd, resources=None, stdout=report)
+ # wait for the run to finish
+ while not os.path.exists(report):
+ time.sleep(20)
+ print "report is ready as '" + report + "'. total time %d seconds" % int(time.time() - start)
+ mailfile(report, rev)
+ ircfile(report, rev)
+
+def mainloop():
+ log("starting up")
+ doing = True
+ while True:
+ rev = getrev()
+ if rev:
+ doing = True
+ report = "regression-r" + rev + ".log"
+ runrev(rev, report)
+ else:
+ if doing:
+ print "-- nothing to do --"
+ sys.stdout.flush()
+ doing = False
+ time.sleep(100)
+
+if __name__ == '__main__':
+ if len(sys.argv) > 1 and sys.argv[1] == '-d':
+ daemon = Daemon(mainloop)
+ if len(sys.argv) > 2 and sys.argv[2] == 'stop':
+ result = daemon.stop()
+ else:
+ result = daemon.start()
+ if not result:
+ print "couldn't start daemon!"
+ sys.exit(1)
+ elif len(sys.argv) > 2 and sys.argv[1] == '-r':
+ # run a specific revision and quit
+ rev = sys.argv[2]
+ runrev(rev)
+ else:
+ # run with queues, but on the console for debugging
+ mainloop()
Property changes on: trunk/cluster/tti/reg_dispatch.py
___________________________________________________________________
Name: svn:executable
+ *
Added: trunk/cluster/tti/regress.py
===================================================================
--- trunk/cluster/tti/regress.py 2007-07-15 13:45:17 UTC (rev 8125)
+++ trunk/cluster/tti/regress.py 2007-07-15 14:04:35 UTC (rev 8126)
@@ -0,0 +1,395 @@
+#/usr/bin/env python
+
+import os
+import time
+import sys
+
+try:
+ from mpi4py import MPI
+except ImportError:
+ class DummyMPI:
+ '''A dummy MPI class for running serial jobs.'''
+ size = 1
+ rank = 0
+ MPI = DummyMPI()
+
+class Conf:
+ def __init__(self):
+ # set defaults
+ self.batch = False
+ self.update = False
+ self.verbose = False
+ self.testpath = os.path.join(os.environ['HOME'], 'tests')
+ #self.exe = './language_switch/obj/pspcl6'
+ self.exe = './bin/gs -q -I$HOME/fonts'
+ self.test = 'comparefiles'
+ self.device = 'ppmraw'
+ self.dpi = 600
+
+ def parse(self, args):
+ '''Parse the command line for configuration switches
+
+ For example:
+ conf = Conf()
+ conf.parse(sys.argv)
+ '''
+
+ for index in xrange(1,len(args)):
+ arg = args[index]
+ if arg[:2] == '--':
+
+ # support generic '--opt=val'
+ sep = arg.find('=')
+ if sep > 0:
+ opt = arg[2:sep]
+ val = arg[sep+1:]
+ else:
+ opt = arg[2:]
+
+ # for select options support '--opt val'
+ if opt in ('exe', 'test'):
+ try:
+ val = args[index+1]
+ except IndexError:
+ print 'Warning:', opt, 'requires a specific value.'
+ val = None
+ else:
+ # default to postitive boolean value
+ val = True
+
+ # for select options, accumulate the values
+ if opt in ('test'):
+ opt += 's' # pluralize collections
+ if not hasattr(self, opt):
+ self.__dict__[opt] = []
+ self.__dict__[opt].append(val)
+ else:
+ # set an attribute on ourselves with the option value
+ self.__dict__[opt] = val
+
+ # finally, set defaults for unset accumulating options
+ if not hasattr(self, 'tests'):
+ self.tests = []
+ # guess appropriate defaults based on the executable
+ basename = os.path.basename(self.exe.split()[0])
+ if basename.find('pcl') >= 0:
+ self.tests += ['pcl/pcl5cfts/fts.*',
+ 'pcl/pcl5efts/fts.*',
+ 'pcl/pcl5ccet/*.BIN']
+ if basename.find('ps') >= 0 or basename.find('gs') >= 0:
+ self.tests += ['ps/ps3cet/*.PS']
+ # run the normal comparefiles suite for now
+ self.tests = ['comparefiles/*.ps',
+ 'comparefiles/*.pdf',
+ 'comparfiles/*.ai']
+
+# global configuration instance
+conf = Conf()
+conf.parse(sys.argv)
+
+
+# results of tests are stored as classes
+
+class TestResult:
+ 'generic test result class'
+ def __init__(self, msg=None):
+ self.msg = msg
+ def __str__(self):
+ return 'no result'
+
+class OKResult(TestResult):
+ 'result class for successful tests'
+ def __str__(self):
+ return 'ok'
+
+class FailResult(TestResult):
+ 'result class for failed tests'
+ def __str__(self):
+ return 'FAIL'
+
+class ErrorResult(TestResult):
+ 'result class for tests that did not complete'
+ def __str__(self):
+ return 'ERROR'
+
+class NewResult(TestResult):
+ 'result class for tests that are new and have no expected result'
+ def __str__(self):
+ return 'new (%s)' % self.msg
+
+class SelfTest:
+ 'generic class for self tests'
+ def __init__(self):
+ self.result = None
+ def description(self):
+ 'returns a short name for the test'
+ return "generic self test"
+ def run(self):
+ 'call this to execute the test'
+ self.result = OKResult()
+
+class SelfTestSuite:
+ '''Generic class for running a collection of SelfTest instances.'''
+
+ def __init__(self, stream=sys.stderr):
+ self.stream = stream
+ self.tests = []
+ self.fails = []
+ self.errors = []
+ self.news = []
+ self.elapsed = 0.0
+
+ def addTest(self, test):
+ self.tests.append(test)
+
+ def addResult(self, test):
+ if test:
+ if not conf.batch:
+ print test.description() + ' ... ' + str(test.result)
+ self.tests.append(test)
+ if isinstance(test.result, ErrorResult):
+ self.errors.append(test)
+ elif isinstance(test.result, NewResult):
+ self.news.append(test)
+ elif not isinstance(test.result, OKResult):
+ # treat everything else as a failure
+ self.fails.append(test)
+
+ def run(self):
+ '''Run each test in sequence.'''
+ starttime = time.time()
+ tests = self.tests
+ self.tests = []
+ for test in tests:
+ test.run()
+ self.addResult(test)
+ self.elapsed = time.time() - starttime
+ self.report()
+
+ def report(self):
+ if not conf.batch:
+ print '-'*72
+ print 'ran %d tests in %.3f seconds on %d nodes\n' % \
+ (len(self.tests), self.elapsed, MPI.size)
+ if self.fails:
+ print 'FAILED %d of %d tests' % \
+ (len(self.fails),len(self.tests))
+ if conf.batch:
+ for test in self.fails:
+ print ' ' + test.file
+ print
+ if self.errors:
+ print 'ERROR running %d of %d tests' % \
+ (len(self.errors),len(self.tests))
+ if conf.batch:
+ for test in self.errors:
+ print ' ' + test.description()
+ print test.result.msg
+ print
+ if not self.fails and not self.errors and not self.news:
+ print 'PASSED all %d tests' % len(self.tests)
+ if self.news:
+ print '%d NEW files with no previous result' % len(self.news)
+ print
+
+class MPITestSuite(SelfTestSuite):
+ '''Use MPI to run multiple tests in parallel.'''
+
+ def run(self):
+ starttime = time.time()
+ if MPI.rank > 0:
+ # daughter nodes run requested tests
+ test = None
+ while True:
+ MPI.COMM_WORLD.Send(test, dest=0)
+ test = MPI.COMM_WORLD.Recv(source=0)
+ if not test:
+ break
+ test.run()
+ else:
+ # mother node hands out work and reports
+ tests = self.tests
+ self.tests = []
+ while tests:
+ status = MPI.Status()
+ test = MPI.COMM_WORLD.Recv(source=MPI.ANY_SOURCE, status=status)
+ self.addResult(test)
+ MPI.COMM_WORLD.Send(tests.pop(0), dest=status.source)
+ # retrieve outstanding results and tell the nodes we're finished
+ for node in xrange(1, MPI.size):
+ test = MPI.COMM_WORLD.Recv(source=MPI.ANY_SOURCE)
+ self.addResult(test)
+ MPI.COMM_WORLD.Send(None, dest=node)
+ stoptime = time.time()
+ self.elapsed = stoptime - starttime
+ if MPI.rank == 0:
+ self.report()
+
+# specific code for our needs
+
+class md5Test(SelfTest):
+ '''Test class for running a file and comparing the output to an
+ expected value.'''
+
+ def __init__(self, file, md5sum, dpi=600, device="ppmraw"):
+ SelfTest.__init__(self)
+ self.file = file
+ self.md5sum = md5sum
+ self.dpi = dpi
+ self.exe = conf.exe
+ self.opts = "-dQUIET -dSAFER -dNOPAUSE -dBATCH -K1000000"
+ self.opts += " -dSAFER -dBATCH"
+ self.opts += " -Z@"
+ self.opts += " -sDEVICE=%s -r%d" % (device, dpi)
+ #self.psopts = '-dMaxBitmap=40000000 -dJOBSERVER ./lib/gs_cet.ps'
+ self.psopts = '-dMaxBitmap=30000000 -dNOOUTERSAVE -dJOBSERVER -c false 0 startjob pop -f'
+
+ def description(self):
+ return 'Checking ' + self.file
+
+ def run(self):
+ scratch = os.path.join('/tmp', os.path.basename(self.file) + '.md5')
+ # add psopts if it's a postscript file
+ if self.file[-3:].lower() == '.ps' or \
+ self.file[-4:].lower() == '.eps' or \
+ self.file[-4:].lower() == '.pdf' or \
+ self.file[-3:].lower() == '.ai':
+ cmd = '%s %s -sOutputFile="|md5sum>%s" %s - < %s ' % \
+ (self.exe, self.opts, scratch, self.psopts, self.file)
+ else:
+ cmd = '%s %s -sOutputFile="|md5sum>%s" %s' % \
+ (self.exe, self.opts, scratch, self.file)
+ run = os.popen(cmd)
+ msg = run.readlines()
+ code = run.close()
+ if code:
+ self.result = ErrorResult(''.join(msg))
+ return
+ try:
+ checksum = open(scratch)
+ md5sum = checksum.readline().split()[0]
+ checksum.close()
+ os.unlink(scratch)
+ except IOError:
+ self.result = ErrorResult('no output')
+ return
+ if not self.md5sum:
+ self.result = NewResult(md5sum)
+ return
+ if self.md5sum == md5sum:
+ self.result = OKResult(md5sum)
+ else:
+ self.result = FailResult(md5sum)
+
+class DB:
+ '''class representing an md5 sum database'''
+
+ def __init__(self):
+ self.store = None
+ self.db = {}
+
+ def load(self, store='reg_baseline.txt'):
+ self.store = store
+ try:
+ f = open(self.store)
+ except IOError:
+ print 'WARNING: could not open baseline database', self.store
+ return
+ for line in f.readlines():
+ if line[:1] == '#': continue
+ fields = line.split()
+ try:
+ file = fields[0].strip()
+ md5sum = fields[1].strip()
+ self.db[file] = md5sum
+ except IndexError:
+ pass
+ f.close()
+
+ def save(self, store=None):
+ if not store:
+ store = self.store
+ f = open(store, 'w')
+ f.write('# regression test baseline\n')
+ for key in self.db.keys():
+ f.write(str(key) + ' ' + str(self.db[key]) + '\n')
+ f.close()
+
+ # provide a dictionary interface
+ def __getitem__(self, key):
+ try:
+ value = self.db[key]
+ except KeyError:
+ value = None
+ return value
+
+ def __setitem__(self, key, value):
+ self.db[key] = value
+
+
+def run_regression():
+ 'run normal set of regressions'
+ from glob import glob
+ if MPI.size > 1:
+ suite = MPITestSuite()
+ else:
+ suite = SelfTestSuite()
+ if MPI.rank == 0:
+ db = DB()
+ db.load()
+ for test in conf.tests:
+ for file in glob(os.path.join(conf.testpath,test)):
+ suite.addTest(md5Test(file, db[file], conf.dpi, conf.device))
+ if MPI.size > 1 and not conf.batch:
+ print 'running tests on %d nodes...' % MPI.size
+ suite.run()
+ if MPI.rank == 0:
+ # update the database with new files and save
+ for test in suite.news:
+ db[test.file] = test.result.msg
+ if conf.update:
+ if len(suite.fails):
+ print 'Updating baselines for the failed tests.'
+ for test in suite.fails:
+ db[test.file] = test.result.msg
+ db.save()
+
+
+# self test routines for the self test classes
+
+class RandomTest(SelfTest):
+ 'test class with random results for testing'
+ def description(self):
+ return 'random test result'
+ def run(self):
+ import random
+ options = ( OKResult(), FailResult(), ErrorResult(), TestResult() )
+ r = random.Random()
+ r.seed()
+ self.result = r.choice(options)
+
+def test_ourselves():
+ print 'testing a single test:'
+ suite = SelfTestSuite()
+ suite.addTest(SelfTest())
+ suite.run()
+ print 'testing a set of tests:'
+ suite = SelfTestSuite()
+ for count in range(8):
+ suite.addTest(SelfTest())
+ suite.run()
+ print 'testing random results:'
+ suite = SelfTestSuite()
+ import random
+ r = random.Random()
+ r.seed()
+ for count in range(4 + int(r.random()*12)):
+ suite.addTest(RandomTest())
+ suite.run()
+
+
+# Do someting useful when executed directly
+
+if __name__ == '__main__':
+ #test_ourselves()
+ run_regression()
More information about the gs-cvs
mailing list