[gs-cvs] rev 8126 - in trunk/cluster: . casper tti

giles at ghostscript.com giles at ghostscript.com
Sun Jul 15 07:04:35 PDT 2007


Author: giles
Date: 2007-07-15 07:04:35 -0700 (Sun, 15 Jul 2007)
New Revision: 8126

Added:
   trunk/cluster/README
   trunk/cluster/casper/
   trunk/cluster/casper/dispatch.py
   trunk/cluster/casper/push-new-rev.sh
   trunk/cluster/casper/queue-new-rev.sh
   trunk/cluster/casper/queue-rev-pcl.py
   trunk/cluster/tti/
   trunk/cluster/tti/ciatest.py
   trunk/cluster/tti/reg_dispatch-pcl.py
   trunk/cluster/tti/reg_dispatch.py
   trunk/cluster/tti/regress.py
Log:
Record the current state of the cluster regression test codebase.


Added: trunk/cluster/README
===================================================================
--- trunk/cluster/README	2007-07-15 13:45:17 UTC (rev 8125)
+++ trunk/cluster/README	2007-07-15 14:04:35 UTC (rev 8126)
@@ -0,0 +1,15 @@
+These are the tools we use the do cluster-based regression testing.
+
+The 'casper' directory contains a daemon that watches a queue directory 
+filled in by svn post-commit hook scripts, and copies them to the 
+corresponding queue on the cluster machines.
+
+The 'tti' directory contains the daemons which launch regressions based 
+on the queue files. Daemons can be launched interactively with no 
+arguments, or with '-d start' and '-d stop' to run in the background,
+detatched from any terminal.
+
+This is not a live version of the source: commits here are not 
+automatically picked up by the running system. This is more a
+record of major code changes.
+

Added: trunk/cluster/casper/dispatch.py
===================================================================
--- trunk/cluster/casper/dispatch.py	2007-07-15 13:45:17 UTC (rev 8125)
+++ trunk/cluster/casper/dispatch.py	2007-07-15 14:04:35 UTC (rev 8126)
@@ -0,0 +1,144 @@
+#!/usr/bin/env python
+
+import os, sys, signal
+import time
+
+sleeptime = 30
+queuedir = '/home/regression/tti/queue'
+pclqueuedir = '/home/regression/tti/queue.pcl'
+ssh_id = '/home/regression/.ssh/ttitunnel'
+ssh_dest = 'atfxsw01 at tticluster.com'
+
+class Daemon:
+  '''Class for forking off a daemon process.'''
+  def __init__(self, main):
+    self.main = main
+    self.running = False
+    self.pidfilename = "reg_dispatch.pid"
+    self.logfilename = "reg_dispatch.log"
+  def start(self, stdin=None, stdout=None, stderr=None):
+    'Fork off a separate process, running our function.'
+
+    # make sure we're not already running
+    if os.path.exists(self.pidfilename):
+      sys.stderr.write("daemon already running.\n")
+      sys.stderr.write("stop it, or remove the stale pidfile ")
+      sys.stderr.write(self.pidfilename + "\n")
+
+    # fork from the parent process and return
+    try:
+      pid = os.fork()
+      if pid > 0:
+        # todo: grab the pid and return
+        # after the daemon is running
+        return True
+    except OSError, e:
+      sys.stderr.write("initial daemonization fork failed: (%d) %s\n" %
+        (e.errno, e.strerror))
+      return False
+
+    # Decouple ourselves from the parent environment
+    # normally we chdir("/") to avoid blocking the unmount of our
+    # launch directory, but we can't do any work without it
+    # and it makes things simpler to use the cwd
+    #os.chdir("/")
+    os.umask(0)
+    os.setsid()
+
+    # Do a second fork to avoid becoming a controlling terminal
+    try:
+      pid = os.fork()
+      if pid > 0:
+        # record the child's pid
+        pidfile = open(self.pidfilename, "w")
+        pidfile.write(str(pid) + "\n")
+        pidfile.close()
+        # exit the second parent
+        sys.exit(0)
+    except OSError, e:
+      sys.stderr.write("second daemonization fork failed: (%d) %s\n" %
+        (e.errno, e.strerror))
+      sys.exit(1)
+
+    # redirect standard file descriptors
+    if not stdin: stdin="/dev/null"
+    if not stdout: stdout = self.logfilename
+    if not stderr: stderr = stdout
+    si = file(stdin, 'r')
+    so = file(stdout, 'a')
+    se = file(stderr, 'a')
+    os.dup2(si.fileno(), sys.stdin.fileno())
+    os.dup2(so.fileno(), sys.stdout.fileno())
+    os.dup2(se.fileno(), sys.stderr.fileno())
+
+    # execute the reqested main function
+    self.main()
+
+  def stop(self):
+    'Stop a running daemon.'
+    pidfile = open(self.pidfilename, "r")
+    pid = int(pidfile.readline())
+    os.kill(pid, signal.SIGHUP)
+
+
+def getrev(queue=queuedir):
+  revs = os.listdir(queue)
+  # ideally we'd sort by mtime, but alphabetical for now
+  revs.sort()
+  try:
+    # call basename to strip hack attempts with relative paths
+    rev = os.path.basename(revs[0])
+  except IndexError:
+    rev = None
+  return rev
+
+def mainloop():
+  doing = True
+  while True:
+    # check for ghostscript runs
+    rev = getrev(queuedir)
+    pclrev = getrev(pclqueuedir)
+    if rev:
+      doing = True
+      print 'submitting gs-r' + rev
+      cmd = 'ssh -i ' + ssh_id + ' ' + ssh_dest + ' '
+      #cmd += 'touch regression/rev.queue/' + rev
+      cmd += 'touch ' + os.path.join('regression/queue.gs/', rev)
+      os.system(cmd)
+      os.unlink(os.path.join(queuedir,rev))
+      continue
+    elif pclrev:
+      doing = True
+      rev = pclrev
+      print 'updating ghostpcl-r' + rev
+      pclrev, gsrev = rev.split('+')
+      print rev, 'splits into gs rev', gsrev, 'and pcl rev', pclrev
+      cmd = 'svn update -r ' + pclrev + ' ghostpcl'
+      os.system(cmd)
+      # svn external will fail; override with a manual checkout
+      cmd = 'svn co http://svn.ghostscript.com:8080/ghostscript/trunk/gs -r ' + gsrev + ' ghostpcl/gs'
+      os.system(cmd)
+      print 'pushing update'
+      cmd = 'rsync -avz'
+      cmd += ' --exclude ufst --exclude .svn ghostpcl/*'
+      cmd += ' ' + ssh_dest + ':regression/ghostpcl-r' + rev + '/'
+      os.system(cmd)
+      print 'submitting ghostpcl-r' + rev
+      cmd = 'ssh -i ' + ssh_id + ' ' + ssh_dest + ' '
+      cmd += 'touch ' + os.path.join('regression/queue.pcl/', rev)
+      os.system(cmd)
+      os.unlink(os.path.join(pclqueuedir,rev))
+    else:
+      if doing:
+        print '-- nothing to do --'
+        doing = False
+      time.sleep(sleeptime)
+
+
+if __name__ == '__main__':
+  daemon = Daemon(mainloop)
+  result = daemon.start()
+  if not result:
+    print "couldn't start daemon!"
+    sys.exit(1)
+

Added: trunk/cluster/casper/push-new-rev.sh
===================================================================
--- trunk/cluster/casper/push-new-rev.sh	2007-07-15 13:45:17 UTC (rev 8125)
+++ trunk/cluster/casper/push-new-rev.sh	2007-07-15 14:04:35 UTC (rev 8126)
@@ -0,0 +1,13 @@
+#!/bin/sh
+
+# run me to request a cluster regression for an svn rev.
+rev=$1
+
+SSH_ID=/home/regression/.ssh/ttitunnel
+SSH_HOST=atfxsw01 at tticluster.com
+
+ssh -i $SSH_ID $SSH_HOST touch regression/rev.queue/$1
+# todo: check for failure, this doesn't work
+if ! test $?; then
+  echo "regression request submitted to the cluster"
+fi


Property changes on: trunk/cluster/casper/push-new-rev.sh
___________________________________________________________________
Name: svn:executable
   + *

Added: trunk/cluster/casper/queue-new-rev.sh
===================================================================
--- trunk/cluster/casper/queue-new-rev.sh	2007-07-15 13:45:17 UTC (rev 8125)
+++ trunk/cluster/casper/queue-new-rev.sh	2007-07-15 14:04:35 UTC (rev 8126)
@@ -0,0 +1,7 @@
+#!/bin/sh
+
+# run me to queue a new gs regression for testing
+
+QUEUE_DIR=/home/regression/tti/queue
+
+touch $QUEUE_DIR/$1

Added: trunk/cluster/casper/queue-rev-pcl.py
===================================================================
--- trunk/cluster/casper/queue-rev-pcl.py	2007-07-15 13:45:17 UTC (rev 8125)
+++ trunk/cluster/casper/queue-rev-pcl.py	2007-07-15 14:04:35 UTC (rev 8126)
@@ -0,0 +1,18 @@
+#!/usr/bin/env python
+
+pclrepos = '/var/lib/svn-private/ghostpcl'
+gsrepos = '/var/lib/svn/ghostscript'
+queuedir = '/home/regression/tti/queue.pcl'
+
+import os
+
+def getrev(repos):
+  rev = os.popen('svnlook youngest ' + repos).readline().strip()
+  return rev
+
+def makerev():
+  pclrev = getrev(pclrepos)
+  gsrev = getrev(gsrepos)
+  return pclrev + '+' + gsrev
+
+os.system('touch ' + os.path.join(queuedir, makerev()))


Property changes on: trunk/cluster/casper/queue-rev-pcl.py
___________________________________________________________________
Name: svn:executable
   + *

Added: trunk/cluster/tti/ciatest.py
===================================================================
--- trunk/cluster/tti/ciatest.py	2007-07-15 13:45:17 UTC (rev 8125)
+++ trunk/cluster/tti/ciatest.py	2007-07-15 14:04:35 UTC (rev 8126)
@@ -0,0 +1,63 @@
+#!/usr/bin/env python
+
+# test script for submitting messages to CIA
+
+server = 'http://cia.navi.cx'
+project = 'ghostscript'
+module = 'gs'
+
+import xmlrpclib
+import time
+
+class Message:
+  def __init__(self, log='', rev=''):
+    self.log = log
+    self.rev = rev
+    self.server = server
+    self.project = project
+    self.module = module
+    self.name = 'manual test'
+    self.version = '0.1'
+  def gen(self):
+    xml = ' <generator>\n'
+    xml += '  <name>' + str(self.name) + '</name>\n'
+    xml += '  <version>' + str(self.version) + '</version>\n'
+    xml += ' </generator>\n'
+    return xml
+  def source(self):
+    xml = ' <source>\n'
+    xml += '  <project>' + str(self.project) + '</project>\n'
+    xml += '  <module>' + str(self.module) + '</module>\n'
+    xml += ' </source>\n'
+    return xml
+  def message(self):
+    xml = '<message>\n'
+    xml += self.gen()
+    xml += self.source()
+    xml += ' <body>\n'
+    xml += '  <commit>\n'
+    if self.rev:
+      xml += '   <revision>' + self.rev + '</revision>\n'
+    xml += '   <author>regression</author>\n'
+    xml += '   <log>' + str(self.log) + '</log>\n'
+    xml += '  </commit>\n'
+    xml += ' </body>\n'
+    xml += ' <timestamp>' + str(int(time.time())) + '</timestamp>\n'
+    xml += '</message>\n'
+    return xml
+  def send(self, server = None):
+    if not server: server = self.server
+    xmlrpclib.ServerProxy(server).hub.deliver(self.message())
+  def __str__(self):
+    return self.message()
+
+def irc_report(filename, rev=''):
+  file = open(filename)
+  msg = ''.join(file.readlines())
+  m = Message(msg, rev)
+  m.send()
+
+if __name__ == '__main__':
+  filename = 'regression-r7832.log'
+  print 'reporting results from \'%s\' to irc' % filename
+  irc_report(filename, '7832')

Added: trunk/cluster/tti/reg_dispatch-pcl.py
===================================================================
--- trunk/cluster/tti/reg_dispatch-pcl.py	2007-07-15 13:45:17 UTC (rev 8125)
+++ trunk/cluster/tti/reg_dispatch-pcl.py	2007-07-15 14:04:35 UTC (rev 8126)
@@ -0,0 +1,285 @@
+#/usr/bin/env python
+
+# regression test dispatch script
+# we can be used to launch a parallel regression
+# or in daemon mode to run regressions on specific revisions
+
+import os, sys, signal
+import re, time
+import ciatest
+
+class Daemon:
+  '''Class for forking off a daemon process.'''
+  def __init__(self, main):
+    self.main = main
+    self.running = False
+    self.pidfilename = "reg_dispatch.pid"
+    self.logfilename = "reg_dispatch.log"
+  def start(self, stdin=None, stdout=None, stderr=None):
+    'Fork off a separate process, running our function.'
+
+    # make sure we're not already running
+    if os.path.exists(self.pidfilename):
+      sys.stderr.write("daemon already running.\n")
+      sys.stderr.write("stop it, or remove the stale pidfile ")
+      sys.stderr.write(self.pidfilename + "\n")
+
+    # fork from the parent process and return
+    try:
+      pid = os.fork()
+      if pid > 0:
+	# todo: grab the pid and return
+	# after the daemon is running
+        return True
+    except OSError, e:
+      sys.stderr.write("initial daemonization fork failed: (%d) %s\n" %
+	(e.errno, e.strerror))
+      return False
+
+    # Decouple ourselves from the parent environment
+    # normally we chdir("/") to avoid blocking the unmount of our
+    # launch directory, but we can't do any work without it
+    # and it makes things simpler to use the cwd
+    #os.chdir("/")
+    os.umask(0)
+    os.setsid()
+
+    # Do a second fork to avoid becoming a controlling terminal
+    try:
+      pid = os.fork()
+      if pid > 0:
+        # record the child's pid
+        pidfile = open(self.pidfilename, "w")
+        pidfile.write(str(pid) + "\n")
+        pidfile.close()
+        # exit the second parent
+        sys.exit(0)
+    except OSError, e:
+      sys.stderr.write("second daemonization fork failed: (%d) %s\n" %
+	(e.errno, e.strerror))
+      sys.exit(1)
+
+    # redirect standard file descriptors
+    if not stdin: stdin="/dev/null"
+    if not stdout: stdout = self.logfilename
+    if not stderr: stderr = stdout
+    si = file(stdin, 'r')
+    so = file(stdout, 'a')
+    se = file(stderr, 'a')
+    os.dup2(si.fileno(), sys.stdin.fileno())
+    os.dup2(so.fileno(), sys.stdout.fileno())
+    os.dup2(se.fileno(), sys.stderr.fileno())
+
+    # execute the reqested main function
+    self.main()
+
+  def stop(self):
+    'Stop a running daemon.'
+    pidfile = open(self.pidfilename, "r")
+    pid = int(pidfile.readline())
+    if pid:
+      os.kill(pid, signal.SIGHUP)
+    else:
+      sys.stderr.write("no pidfile or daemon not running.\n")
+    os.unlink(self.pidfilename)
+
+def update(rev):
+  'update the executable to revision <rev>'
+  workdir = "gs -r" + rev
+  svn = os.system("cd " + workdir + " && svn up -r" + rev)
+  make = os.system("cd " + workdir +" && make clean && nice make debug")
+
+def getrev(cachedir="queue.pcl"):
+  'Read the queue directory and select a revision to test'
+  if not os.path.exists(cachedir): os.mkdir(cachedir)
+  revs = os.listdir(cachedir)
+  try:
+    rev = revs[0]
+    os.unlink(os.path.join(cachedir, rev))
+  except IndexError:
+    rev = None
+  return rev
+
+def mailfile(file, rev=None):
+  'Mail out the report'
+  cmd = 'cat ' + file + ' '
+  cmd += '| mail -s "cluster regression'
+  if rev:
+    cmd += ' ghostpcl-r' + rev
+  cmd += ' (xefitra)" '
+  #cmd += 'giles at ghostscript.com'
+  cmd += 'gs-regression at ghostscript.com'
+  os.system(cmd)
+
+def ircfile(file, rev=None):
+  'Notify CIA and thus irc'
+  msg = ''.join(open(file).readlines())
+  if msg:
+    try:
+      ciatest.Message(msg, rev=rev, module='ghostpcl').send()
+    except:
+      pass
+
+def choosecluster():
+  '''Decide how many nodes of which cluster to run on.
+     returns a (cluster_name, node_count) tuple.'''
+  # figure out how many nodes are free
+  r = re.compile('^\s+(?P<cluster>\w+).*\s+(?P<procs>\d+)\s+(?P<free>\d+)\s*$')
+  clusters=[]
+  nodes = 0
+  cluster = None
+  upnodes = os.popen("upnodes")
+  for line in upnodes.readlines():
+    m = r.match(line)
+    if m: 
+      name = m.group("cluster")
+      procs = int(m.group("procs"))
+      free = int(m.group("free"))
+      # remember the cluster with the most free nodes
+      if free > nodes and name != 'total': 
+        nodes = free
+        cluster = name
+      clusters.append((name,procs,free))
+  return (cluster, nodes)
+
+def usage(name=sys.argv[0]):
+  print "Usage: %s <revision>" % name
+  print "launch a regression run on tticluster.com"
+  print "testing gs svn rev <revision> against the default baseline"
+
+def log(msg):
+  '''print a timestamped log message. We use this for major tasks,
+  and a normal print command for progress and error messages.'''
+  print '[' + time.ctime() + '] ' + msg
+
+def pbsjob(cmd, resources=None, stdout=None, stderr=None, mpi=True):
+  if not resources:
+    cluster, nodes = choosecluster()
+    if nodes > 1 and cluster == 'red' or cluster == 'green':
+      # red reports two cpus per node
+      nodes /= 2
+      ppn = ':ppn=2'
+    else:
+      ppn = ''
+    resources = 'nodes=%d:%s:run%s,walltime=20:00' % (nodes, cluster, ppn)
+    print 'requesting', nodes, 'nodes on', cluster
+  f = open('regress.pbs', 'w')
+  f.write('#PBS -l ' + resources)
+  if stdout:
+    f.write(' -o ' + stdout)
+    if stdout == stderr:
+      f.write(' -j oe')
+    elif stderr:
+      f.write(' -e ' + stderr)
+  f.write(' -d ' + os.getcwd())
+  f.write('\n\n')
+  if mpi:
+    f.write('mpiexec -comm mpich2-pmi ')
+    f.write(' -nostdin -kill -nostdout')
+    f.write(' ')
+  f.write(cmd)
+  f.write('\n')
+  f.close()
+  os.system('qsub regress.pbs')
+
+def build(workdir=None, clean=False):
+  'compile an executable from the current source'
+  if clean:
+    cmd = "make clean && nice ./autogen.sh && nice make"
+  else:
+    cmd = "nice make"
+  if workdir:
+    cmd = "cd " + workdir + " && " + cmd
+  if False:
+    # build on the dispatch host
+    make = os.system(cmd)
+    make = make >> 8
+  else:
+    # FIXME: alternate build on a compile node
+    report = 'update.log'
+    resources = 'nodes=1:compile'
+    cmd += "\nexit"
+    if os.path.exists(report): os.unlink(report)
+    make = pbsjob(cmd, resources, stdout=report, mpi=False)
+    while not os.path.exists(report): time.sleep(5)
+  if make:
+    log("build failed! exit code " + str(make))
+    return False
+  # update successful
+  return True
+
+
+def mainloop():
+  log("starting up")
+  doing = True
+  while True:
+    rev = getrev()
+    if rev:
+      doing = True
+      workdir = "ghostpcl-r" + rev
+      # create a working copy if necessary
+      if not os.path.exists(workdir):
+        print "couldn't find requested working copy '%s'\n" % workdir
+        continue
+      log("building " + workdir)
+      build(workdir)
+      log("build complete")
+      if not os.path.exists(os.path.join(workdir, "reg_baseline.txt")):
+        os.system("cp reg_baseline.txt " + workdir)
+      log("running regression on ghostpcl-r" + rev)
+      start = time.time()
+      report = "regression-r" + rev + ".log"
+      # remove the report if it exists since we use this to check completion
+      if os.path.exists(report): os.unlink(report)
+      f = open('regress.pbs', 'w')
+      cluster, nodes = choosecluster()
+      if nodes > 1 and (cluster == 'red' or cluster == 'green'):
+        # red reports two cpus per node
+        nodes /= 2
+        ppn = ':ppn=2'
+      else:
+        ppn = ''
+      f.write('#PBS -l nodes=%s:%s:run%s,walltime=20:00' % 
+        (nodes, cluster, ppn))
+      f.write(' -o ' + report)
+      #f.write(' -j oe')
+      #f.write(' -e /dev/null')
+      f.write(' -e ' + report + '.err')
+      f.write(' -d ' + os.path.join(os.getcwd(), workdir))
+      f.write('\n\n')
+      f.write('mpiexec -comm mpich2-pmi ')
+      f.write(' -nostdin -kill -nostdout')
+      f.write(' bwpython ../regress.py')
+      f.write(' --batch --update')
+      f.write(' --exe main/obj/pcl6')
+      f.write('\n')
+      f.close()
+      print 'requesting', nodes, 'nodes on', cluster
+      os.system('qsub regress.pbs')
+      # wait for the run to finish
+      while not os.path.exists(report):
+        time.sleep(20)
+      print "report is ready as '" + report + "'. total time %d seconds" % int(time.time() - start)
+      ircfile(report, rev)
+      #mailfile(report, rev)
+      os.system("cp " + os.path.join(workdir, "reg_baseline.txt ") + " .")
+    else:
+      if doing:
+        print "-- nothing to do --"
+        sys.stdout.flush()
+        doing = False
+      time.sleep(100)
+
+if __name__ == '__main__':
+  if len(sys.argv) > 1 and sys.argv[1] == '-d':
+    daemon = Daemon(mainloop)
+    if len(sys.argv) > 2 and sys.argv[2] == 'stop':
+      result = daemon.stop()
+    else:
+      result = daemon.start()
+      if not result:
+        print "couldn't start daemon!"
+        sys.exit(1)
+  else:
+    # don't run as a daemon by default
+    mainloop()


Property changes on: trunk/cluster/tti/reg_dispatch-pcl.py
___________________________________________________________________
Name: svn:executable
   + *

Added: trunk/cluster/tti/reg_dispatch.py
===================================================================
--- trunk/cluster/tti/reg_dispatch.py	2007-07-15 13:45:17 UTC (rev 8125)
+++ trunk/cluster/tti/reg_dispatch.py	2007-07-15 14:04:35 UTC (rev 8126)
@@ -0,0 +1,272 @@
+#/usr/bin/env python
+
+# regression test dispatch script
+# we can be used to launch a parallel regression
+# or in daemon mode to run regressions on specific revisions
+
+import os, sys, signal
+import re, time
+import ciatest
+
+class Daemon:
+  '''Class for forking off a daemon process.'''
+  def __init__(self, main):
+    self.main = main
+    self.running = False
+    self.pidfilename = "reg_dispatch.pid"
+    self.logfilename = "reg_dispatch.log"
+  def start(self, stdin=None, stdout=None, stderr=None):
+    'Fork off a separate process, running our function.'
+
+    # make sure we're not already running
+    if os.path.exists(self.pidfilename):
+      sys.stderr.write("daemon already running.\n")
+      sys.stderr.write("stop it, or remove the stale pidfile ")
+      sys.stderr.write(self.pidfilename + "\n")
+
+    # fork from the parent process and return
+    try:
+      pid = os.fork()
+      if pid > 0:
+	# todo: grab the pid and return
+	# after the daemon is running
+        return True
+    except OSError, e:
+      sys.stderr.write("initial daemonization fork failed: (%d) %s\n" %
+	(e.errno, e.strerror))
+      return False
+
+    # Decouple ourselves from the parent environment
+    # normally we chdir("/") to avoid blocking the unmount of our
+    # launch directory, but we can't do any work without it
+    # and it makes things simpler to use the cwd
+    #os.chdir("/")
+    os.umask(0)
+    os.setsid()
+
+    # Do a second fork to avoid becoming a controlling terminal
+    try:
+      pid = os.fork()
+      if pid > 0:
+        # record the child's pid
+        pidfile = open(self.pidfilename, "w")
+        pidfile.write(str(pid) + "\n")
+        pidfile.close()
+        # exit the second parent
+        sys.exit(0)
+    except OSError, e:
+      sys.stderr.write("second daemonization fork failed: (%d) %s\n" %
+	(e.errno, e.strerror))
+      sys.exit(1)
+
+    # redirect standard file descriptors
+    if not stdin: stdin="/dev/null"
+    if not stdout: stdout = self.logfilename
+    if not stderr: stderr = stdout
+    si = file(stdin, 'r')
+    so = file(stdout, 'a')
+    se = file(stderr, 'a')
+    os.dup2(si.fileno(), sys.stdin.fileno())
+    os.dup2(so.fileno(), sys.stdout.fileno())
+    os.dup2(se.fileno(), sys.stderr.fileno())
+
+    # execute the reqested main function
+    self.main()
+
+  def stop(self):
+    'Stop a running daemon.'
+    pidfile = open(self.pidfilename, "r")
+    pid = int(pidfile.readline())
+    if pid:
+      os.kill(pid, signal.SIGHUP)
+    else:
+      sys.stderr.write("no pidfile or daemon not running.\n")
+    os.unlink(self.pidfilename)
+
+# PBS job server utilities
+
+def choosecluster():
+  '''Decide how many nodes of which cluster to run on.
+     returns a (cluster_name, node_count) tuple.'''
+  # figure out how many nodes are free
+  r = re.compile('^\s+(?P<cluster>\w+).*\s+(?P<procs>\d+)\s+(?P<free>\d+)\s*$')
+  clusters=[]
+  nodes = 0
+  upnodes = os.popen("upnodes")
+  for line in upnodes.readlines():
+    m = r.match(line)
+    if m: 
+      name = m.group("cluster")
+      procs = int(m.group("procs"))
+      free = int(m.group("free"))
+      # remember the cluster with the most free nodes
+      if free > nodes and name != 'total': 
+        nodes = free
+        cluster = name
+      clusters.append((name,procs,free))
+  return (cluster, nodes)
+
+def pbsjob(cmd, resources=None, stdout=None, stderr=None, mpi=True):
+  if not resources:
+    cluster, nodes = choosecluster()
+    if nodes > 1 and cluster == 'red' or cluster == 'green':
+      # red reports two cpus per node
+      nodes /= 2
+      ppn = ':ppn=2'
+    else:
+      ppn = ''
+    resources = 'nodes=%d:%s:run%s,walltime=20:00' % (nodes, cluster, ppn)
+    print 'requesting', nodes, 'nodes on', cluster
+  if stdout: jobname = stdout + '.pbs'
+  else: jobname = 'regress.pbs'
+  f = open(jobname, 'w')
+  f.write('#PBS -l ' + resources)
+  if stdout:
+    f.write(' -o ' + stdout)
+    if stdout == stderr:
+      f.write(' -j oe')
+    elif stderr:
+      f.write(' -e ' + stderr)
+  f.write(' -d ' + os.getcwd())
+  f.write('\n\n')
+  if mpi:
+    f.write('mpiexec -comm mpich2-pmi ')
+    f.write(' -nostdin -kill -nostdout')
+    f.write(' ')
+  f.write(cmd)
+  f.write('\n')
+  f.close()
+  os.system('qsub ' + jobname)
+
+
+# regression setup and reporting
+
+def update(rev):
+  'update the source to revision <rev>'
+  svn = os.system("svn up -r" + rev)
+  if svn:
+    log("SVN update failed!")
+    return False
+  return True
+
+def build(clean=False):
+  'compile an executable from the current source'
+  if clean:
+    cmd = "make clean && nice ./autogen.sh && nice make"
+  else:
+    cmd = "nice make"
+  if False:
+    # build on the dispatch host
+    make = os.system(cmd)
+    make = make >> 8
+  else:
+    # build on a compile node
+    resources = 'nodes=1:compile'
+    report = 'update.log'
+    if os.path.exists(report): os.unlink(report)
+    make = pbsjob(cmd, resources, stdout=report, stderr=report, mpi=False)
+    while not os.path.exists(report):
+      time.sleep(5)
+  if make:
+    log("build failed! exit code " + str(make))
+    return False
+  # update successful
+  return True
+
+def getrev(cachedir="../queue.gs"):
+  'Read the queue directory and select a revision to test'
+  revs = os.listdir(cachedir)
+  # we would ideally sort by mtime, but for now just alphabetical
+  revs.sort()
+  try:
+    rev = revs[0]
+    os.unlink(os.path.join(cachedir, rev))
+  except IndexError:
+    rev = None
+  return rev
+
+def mailfile(file, rev=None):
+  'Mail out the report'
+  cmd = 'cat ' + file + ' '
+  cmd += '| mail -s "cluster regression'
+  if rev:
+    cmd += ' gs-r' + rev
+  cmd += ' (xefitra)" '
+  #cmd += 'giles at ghostscript.com'
+  cmd += 'gs-regression at ghostscript.com'
+  os.system(cmd)
+
+def irclog(msg, rev=None):
+  'Notify CIA and thus irc of a message'
+  if msg:
+    try:
+      ciatest.Message(msg, rev).send()
+    except:
+      # ignore errors, the server sometimes barfs
+      pass
+
+def ircfile(file, rev=None):
+  'Send a result file to CIA and thus irc'
+  msg = ''.join(open(file).readlines())
+  irclog(msg, rev)
+
+def usage(name=sys.argv[0]):
+  print "Usage: %s <revision>" % name
+  print "launch a regression run on tticluster.com"
+  print "testing gs svn rev <revision> against the default baseline"
+
+def log(msg):
+  print '[' + time.ctime() + '] ' + msg
+
+def runrev(rev=None, report=None):
+  if not rev: rev = getrev()
+  if not report: report = "regression-r" + rev + ".log"
+  log("running regression on gs-r" + rev)
+  start = time.time()
+  # remove the report if it exists since we use this to check completion
+  if os.path.exists(report): os.unlink(report)
+  if not update(rev): irclog("SVN update failed!", rev)
+  elif not build(clean=True): irclog("Build failed!", rev)
+  else:
+    cmd = 'bwpython ../regress.py --batch --update'
+    pbsjob(cmd, resources=None, stdout=report)
+    # wait for the run to finish
+    while not os.path.exists(report):
+      time.sleep(20)
+    print "report is ready as '" + report + "'. total time %d seconds" % int(time.time() - start)
+    mailfile(report, rev)
+    ircfile(report, rev)
+
+def mainloop():
+  log("starting up")
+  doing = True
+  while True:
+    rev = getrev()
+    if rev:
+      doing = True
+      report = "regression-r" + rev + ".log"
+      runrev(rev, report)
+    else:
+      if doing:
+        print "-- nothing to do --"
+        sys.stdout.flush()
+        doing = False
+      time.sleep(100)
+
+if __name__ == '__main__':
+  if len(sys.argv) > 1 and sys.argv[1] == '-d':
+    daemon = Daemon(mainloop)
+    if len(sys.argv) > 2 and sys.argv[2] == 'stop':
+      result = daemon.stop()
+    else:
+      result = daemon.start()
+      if not result:
+        print "couldn't start daemon!"
+        sys.exit(1)
+  elif len(sys.argv) > 2 and sys.argv[1] == '-r':
+    # run a specific revision and quit
+    rev = sys.argv[2]
+    runrev(rev)
+  else:
+    # run with queues, but on the console for debugging
+    mainloop()


Property changes on: trunk/cluster/tti/reg_dispatch.py
___________________________________________________________________
Name: svn:executable
   + *

Added: trunk/cluster/tti/regress.py
===================================================================
--- trunk/cluster/tti/regress.py	2007-07-15 13:45:17 UTC (rev 8125)
+++ trunk/cluster/tti/regress.py	2007-07-15 14:04:35 UTC (rev 8126)
@@ -0,0 +1,395 @@
+#/usr/bin/env python
+
+import os
+import time
+import sys
+
+try:
+  from mpi4py import MPI
+except ImportError:
+  class DummyMPI:
+    '''A dummy MPI class for running serial jobs.'''
+    size = 1
+    rank = 0
+  MPI = DummyMPI()
+
+class Conf:
+  def __init__(self):
+    # set defaults
+    self.batch = False
+    self.update = False
+    self.verbose = False
+    self.testpath = os.path.join(os.environ['HOME'], 'tests')
+    #self.exe = './language_switch/obj/pspcl6'
+    self.exe = './bin/gs -q -I$HOME/fonts'
+    self.test = 'comparefiles'
+    self.device = 'ppmraw'
+    self.dpi = 600
+
+  def parse(self, args):
+    '''Parse the command line for configuration switches
+
+    For example:
+      conf = Conf()
+      conf.parse(sys.argv)
+    '''
+
+    for index in xrange(1,len(args)):
+      arg = args[index]
+      if arg[:2] == '--':
+
+        # support generic '--opt=val'
+        sep = arg.find('=')
+        if sep > 0:
+          opt = arg[2:sep]
+          val = arg[sep+1:]
+        else:
+          opt = arg[2:]
+
+          # for select options support '--opt val'
+          if opt in ('exe', 'test'):
+            try:
+	      val = args[index+1]
+            except IndexError:
+	      print 'Warning:', opt, 'requires a specific value.'
+	      val = None
+          else:
+	    # default to postitive boolean value
+            val = True
+
+        # for select options, accumulate the values
+        if opt in ('test'):
+          opt += 's' # pluralize collections
+          if not hasattr(self, opt):
+            self.__dict__[opt] = []
+          self.__dict__[opt].append(val)
+        else:
+	  # set an attribute on ourselves with the option value
+          self.__dict__[opt] = val
+ 
+    # finally, set defaults for unset accumulating options
+    if not hasattr(self, 'tests'):
+      self.tests = []
+      # guess appropriate defaults based on the executable
+      basename = os.path.basename(self.exe.split()[0])
+      if basename.find('pcl') >= 0:
+        self.tests += ['pcl/pcl5cfts/fts.*',
+	'pcl/pcl5efts/fts.*', 
+	'pcl/pcl5ccet/*.BIN']
+      if basename.find('ps') >= 0 or basename.find('gs') >= 0:
+	self.tests += ['ps/ps3cet/*.PS']
+        # run the normal comparefiles suite for now
+        self.tests = ['comparefiles/*.ps', 
+		 'comparefiles/*.pdf', 
+		 'comparfiles/*.ai']
+
+# global configuration instance
+conf = Conf()
+conf.parse(sys.argv)
+
+
+# results of tests are stored as classes
+
+class TestResult:
+  'generic test result class'
+  def __init__(self, msg=None):
+    self.msg = msg
+  def __str__(self):
+    return 'no result'
+
+class OKResult(TestResult):
+  'result class for successful tests'
+  def __str__(self):
+    return 'ok'
+
+class FailResult(TestResult):
+  'result class for failed tests'
+  def __str__(self):
+    return 'FAIL'
+
+class ErrorResult(TestResult):
+  'result class for tests that did not complete'
+  def __str__(self):
+    return 'ERROR'
+
+class NewResult(TestResult):
+  'result class for tests that are new and have no expected result'
+  def __str__(self):
+    return 'new (%s)' % self.msg
+
+class SelfTest:
+  'generic class for self tests'
+  def __init__(self):
+    self.result = None
+  def description(self):
+    'returns a short name for the test'
+    return "generic self test"
+  def run(self):
+    'call this to execute the test'
+    self.result = OKResult()
+
+class SelfTestSuite:
+  '''Generic class for running a collection of SelfTest instances.'''
+
+  def __init__(self, stream=sys.stderr):
+    self.stream = stream
+    self.tests = []
+    self.fails = []
+    self.errors = []
+    self.news = []
+    self.elapsed = 0.0
+
+  def addTest(self, test):
+    self.tests.append(test)
+
+  def addResult(self, test):
+    if test:
+      if not conf.batch:
+        print test.description() + ' ... ' + str(test.result)
+      self.tests.append(test)
+      if isinstance(test.result, ErrorResult):
+        self.errors.append(test)
+      elif isinstance(test.result, NewResult):
+        self.news.append(test)
+      elif not isinstance(test.result, OKResult):
+        # treat everything else as a failure
+        self.fails.append(test)
+
+  def run(self):
+    '''Run each test in sequence.'''
+    starttime = time.time()
+    tests = self.tests
+    self.tests = []
+    for test in tests:
+      test.run()
+      self.addResult(test)
+    self.elapsed = time.time() - starttime
+    self.report()
+
+  def report(self):
+    if not conf.batch:
+      print '-'*72
+    print 'ran %d tests in %.3f seconds on %d nodes\n' % \
+	(len(self.tests), self.elapsed, MPI.size)
+    if self.fails:
+      print 'FAILED %d of %d tests' % \
+	(len(self.fails),len(self.tests))
+      if conf.batch:
+        for test in self.fails:
+          print '  ' + test.file
+        print
+    if self.errors:
+      print 'ERROR running %d of %d tests' % \
+	(len(self.errors),len(self.tests))
+      if conf.batch:
+        for test in self.errors:
+          print '  ' + test.description()
+          print test.result.msg
+        print
+    if not self.fails and not self.errors and not self.news:
+      print 'PASSED all %d tests' % len(self.tests)
+    if self.news:
+      print '%d NEW files with no previous result' % len(self.news)
+    print
+
+class MPITestSuite(SelfTestSuite):
+  '''Use MPI to run multiple tests in parallel.'''
+
+  def run(self):
+    starttime = time.time()
+    if MPI.rank > 0:
+      # daughter nodes run requested tests
+      test = None
+      while True:
+        MPI.COMM_WORLD.Send(test, dest=0)
+        test = MPI.COMM_WORLD.Recv(source=0)
+        if not test:
+          break
+        test.run()
+    else:
+      # mother node hands out work and reports
+      tests = self.tests
+      self.tests = []
+      while tests:
+        status = MPI.Status()
+        test = MPI.COMM_WORLD.Recv(source=MPI.ANY_SOURCE, status=status)
+        self.addResult(test)
+        MPI.COMM_WORLD.Send(tests.pop(0), dest=status.source)
+      # retrieve outstanding results and tell the nodes we're finished
+      for node in xrange(1, MPI.size):
+        test = MPI.COMM_WORLD.Recv(source=MPI.ANY_SOURCE)
+	self.addResult(test)
+        MPI.COMM_WORLD.Send(None, dest=node)
+    stoptime = time.time()
+    self.elapsed = stoptime - starttime
+    if MPI.rank == 0:
+      self.report()
+
+# specific code for our needs
+
+class md5Test(SelfTest):
+  '''Test class for running a file and comparing the output to an
+  expected value.'''
+
+  def __init__(self, file, md5sum, dpi=600, device="ppmraw"):
+    SelfTest.__init__(self)
+    self.file = file
+    self.md5sum = md5sum
+    self.dpi = dpi
+    self.exe = conf.exe
+    self.opts = "-dQUIET -dSAFER -dNOPAUSE -dBATCH -K1000000"
+    self.opts += " -dSAFER -dBATCH"
+    self.opts += " -Z@"
+    self.opts += " -sDEVICE=%s -r%d" % (device, dpi)
+    #self.psopts = '-dMaxBitmap=40000000 -dJOBSERVER ./lib/gs_cet.ps'
+    self.psopts = '-dMaxBitmap=30000000 -dNOOUTERSAVE -dJOBSERVER -c false 0 startjob pop -f'
+
+  def description(self):
+    return 'Checking ' + self.file
+
+  def run(self):
+    scratch = os.path.join('/tmp', os.path.basename(self.file) + '.md5')
+    # add psopts if it's a postscript file
+    if self.file[-3:].lower() == '.ps' or \
+	self.file[-4:].lower() == '.eps' or \
+        self.file[-4:].lower() == '.pdf' or \
+        self.file[-3:].lower() == '.ai':
+      cmd = '%s %s -sOutputFile="|md5sum>%s" %s - < %s ' % \
+	(self.exe, self.opts, scratch, self.psopts, self.file)
+    else:
+      cmd = '%s %s -sOutputFile="|md5sum>%s" %s' % \
+	(self.exe, self.opts, scratch, self.file)
+    run = os.popen(cmd)
+    msg = run.readlines()
+    code = run.close()
+    if code:
+      self.result = ErrorResult(''.join(msg))
+      return
+    try:
+      checksum = open(scratch)
+      md5sum = checksum.readline().split()[0]
+      checksum.close()
+      os.unlink(scratch)
+    except IOError:
+      self.result = ErrorResult('no output')
+      return
+    if not self.md5sum:
+      self.result = NewResult(md5sum)
+      return
+    if self.md5sum == md5sum:
+      self.result = OKResult(md5sum)
+    else:
+      self.result = FailResult(md5sum)
+
+class DB:
+  '''class representing an md5 sum database'''
+
+  def __init__(self):
+    self.store = None
+    self.db = {}
+
+  def load(self, store='reg_baseline.txt'):
+    self.store = store
+    try:
+      f = open(self.store)
+    except IOError:
+      print 'WARNING: could not open baseline database', self.store
+      return
+    for line in f.readlines():
+      if line[:1] == '#': continue
+      fields = line.split()
+      try:
+        file = fields[0].strip()
+        md5sum = fields[1].strip()
+        self.db[file] = md5sum
+      except IndexError:
+        pass
+    f.close()
+
+  def save(self, store=None):
+    if not store:
+      store = self.store
+    f = open(store, 'w')
+    f.write('# regression test baseline\n')
+    for key in self.db.keys():
+      f.write(str(key) + ' ' + str(self.db[key]) + '\n')
+    f.close()
+
+  # provide a dictionary interface
+  def __getitem__(self, key):
+    try:
+      value = self.db[key]
+    except KeyError:
+      value = None
+    return value
+
+  def __setitem__(self, key, value):
+    self.db[key] = value
+
+
+def run_regression():
+  'run normal set of regressions'
+  from glob import glob
+  if MPI.size > 1:
+    suite = MPITestSuite()
+  else:
+    suite = SelfTestSuite()
+  if MPI.rank == 0:
+    db = DB()
+    db.load()
+    for test in conf.tests:
+      for file in glob(os.path.join(conf.testpath,test)):
+        suite.addTest(md5Test(file, db[file], conf.dpi, conf.device))
+    if MPI.size > 1 and not conf.batch:
+      print 'running tests on %d nodes...' % MPI.size
+  suite.run()
+  if MPI.rank == 0:
+    # update the database with new files and save
+    for test in suite.news:
+      db[test.file] = test.result.msg
+    if conf.update:
+      if len(suite.fails):
+        print 'Updating baselines for the failed tests.'
+      for test in suite.fails:
+        db[test.file] = test.result.msg
+    db.save()
+
+
+# self test routines for the self test classes
+
+class RandomTest(SelfTest):
+  'test class with random results for testing'
+  def description(self):
+    return 'random test result'
+  def run(self):
+    import random
+    options = ( OKResult(), FailResult(), ErrorResult(), TestResult() )
+    r = random.Random()
+    r.seed()
+    self.result = r.choice(options)
+
+def test_ourselves():
+  print 'testing a single test:'
+  suite = SelfTestSuite()
+  suite.addTest(SelfTest())
+  suite.run()
+  print 'testing a set of tests:'
+  suite = SelfTestSuite()
+  for count in range(8):
+    suite.addTest(SelfTest())
+  suite.run()
+  print 'testing random results:'
+  suite = SelfTestSuite()
+  import random
+  r = random.Random()
+  r.seed()
+  for count in range(4 + int(r.random()*12)):
+    suite.addTest(RandomTest())
+  suite.run()
+
+
+# Do someting useful when executed directly
+
+if __name__ == '__main__':
+  #test_ourselves()
+  run_regression()



More information about the gs-cvs mailing list