cpython/Demo/sgi/video/VCR.py

535 lines
11 KiB
Python
Raw Normal View History

1993-05-12 09:35:44 -03:00
import fcntl
import IOCTL
from IOCTL import *
import sys
import struct
import select
import posix
import time
1993-05-12 09:35:44 -03:00
DEVICE='/dev/ttyd2'
class UnixFile:
def open(self, name, mode):
self.fd = posix.open(name, mode)
return self
def read(self, len):
return posix.read(self.fd, len)
def write(self, data):
dummy = posix.write(self.fd, data)
def flush(self):
pass
def fileno(self):
return self.fd
def close(self):
dummy = posix.close(self.fd)
1993-05-12 09:35:44 -03:00
def packttyargs(*args):
if type(args) <> type(()):
raise 'Incorrect argtype for packttyargs'
if type(args[0]) == type(1):
iflag, oflag, cflag, lflag, line, chars = args
elif type(args[0]) == type(()):
if len(args) <> 1:
raise 'Only 1 argument expected'
iflag, oflag, cflag, lflag, line, chars = args[0]
elif type(args[0]) == type([]):
if len(args) <> 1:
raise 'Only 1 argument expected'
[iflag, oflag, cflag, lflag, line, chars] = args[0]
str = struct.pack('hhhhb', iflag, oflag, cflag, lflag, line)
for c in chars:
str = str + c
return str
def nullttyargs():
chars = ['\0']*IOCTL.NCCS
return packttyargs(0, 0, 0, 0, 0, chars)
def unpackttyargs(str):
args = str[:-IOCTL.NCCS]
rawchars = str[-IOCTL.NCCS:]
chars = []
for c in rawchars:
chars.append(c)
iflag, oflag, cflag, lflag, line = struct.unpack('hhhhb', args)
return (iflag, oflag, cflag, lflag, line, chars)
def initline(name):
fp = UnixFile().open(name, 2)
ofp = fp
1993-05-12 09:35:44 -03:00
fd = fp.fileno()
rv = fcntl.ioctl(fd, IOCTL.TCGETA, nullttyargs())
iflag, oflag, cflag, lflag, line, chars = unpackttyargs(rv)
iflag = iflag & ~(INPCK|ISTRIP|INLCR|IUCLC|IXON|IXOFF)
oflag = oflag & ~OPOST
cflag = B9600|CS8|CREAD|CLOCAL
lflag = lflag & ~(ISIG|ICANON|ECHO|TOSTOP)
chars[VMIN] = chr(1)
chars[VTIME] = chr(0)
arg = packttyargs(iflag, oflag, cflag, lflag, line, chars)
dummy = fcntl.ioctl(fd, IOCTL.TCSETA, arg)
return fp, ofp
#
#
1993-05-12 09:35:44 -03:00
error = 'VCR.error'
# Commands/replies:
COMPLETION = '\x01'
ACK ='\x0a'
NAK ='\x0b'
NUMBER_N = 0x30
ENTER = '\x40'
EXP_7= '\xde'
EXP_8= '\xdf'
CL ='\x56'
CTRL_ENABLE = EXP_8 + '\xc6'
SEARCH_DATA = EXP_8 + '\x93'
ADDR_SENSE = '\x60'
PLAY ='\x3a'
STOP ='\x3f'
EJECT='\x2a'
FF ='\xab'
REW ='\xac'
STILL='\x4f'
STEP_FWD ='\x2b' # Was: '\xad'
1993-05-12 09:35:44 -03:00
FM_SELECT=EXP_8 + '\xc8'
FM_STILL=EXP_8 + '\xcd'
PREVIEW=EXP_7 + '\x9d'
REVIEW=EXP_7 + '\x9e'
1993-05-12 09:35:44 -03:00
DM_OFF=EXP_8 + '\xc9'
DM_SET=EXP_8 + '\xc4'
FWD_SHUTTLE='\xb5'
REV_SHUTTLE='\xb6'
EM_SELECT=EXP_8 + '\xc0'
N_FRAME_REC=EXP_8 + '\x92'
SEARCH_PREROLL=EXP_8 + '\x90'
EDIT_PB_STANDBY=EXP_8 + '\x96'
EDIT_PLAY=EXP_8 + '\x98'
AUTO_EDIT=EXP_7 + '\x9c'
IN_ENTRY=EXP_7 + '\x90'
IN_ENTRY_RESET=EXP_7 + '\x91'
IN_ENTRY_SET=EXP_7 + '\x98'
IN_ENTRY_INC=EXP_7 + '\x94'
IN_ENTRY_DEC=EXP_7 + '\x95'
IN_ENTRY_SENSE=EXP_7 + '\x9a'
OUT_ENTRY=EXP_7 + '\x92'
OUT_ENTRY_RESET=EXP_7 + '\x93'
OUT_ENTRY_SET=EXP_7 + '\x99'
OUT_ENTRY_INC=EXP_7 + '\x96'
OUT_ENTRY_DEC=EXP_7 + '\x98'
OUT_ENTRY_SENSE=EXP_7 + '\x9b'
MUTE_AUDIO = '\x24'
MUTE_AUDIO_OFF = '\x25'
MUTE_VIDEO = '\x26'
MUTE_VIDEO_OFF = '\x27'
MUTE_AV = EXP_7 + '\xc6'
MUTE_AV_OFF = EXP_7 + '\xc7'
DEBUG=0
1993-05-12 09:35:44 -03:00
1993-06-08 09:47:06 -03:00
class VCR:
def __init__(self):
1993-05-12 09:35:44 -03:00
self.ifp, self.ofp = initline(DEVICE)
self.busy_cmd = None
self.async = 0
self.cb = None
self.cb_arg = None
1993-05-12 09:35:44 -03:00
def _check(self):
if self.busy_cmd:
raise error, 'Another command active: '+self.busy_cmd
def _endlongcmd(self, name):
if not self.async:
self.waitready()
return 1
self.busy_cmd = name
return 'VCR BUSY'
def fileno(self):
return self.ifp.fileno()
def setasync(self, async):
self.async = async
def setcallback(self, cb, arg):
self.setasync(1)
self.cb = cb
self.cb_arg = arg
def poll(self):
if not self.async:
raise error, 'Can only call poll() in async mode'
if not self.busy_cmd:
return
if self.testready():
if self.cb:
apply(self.cb, self.cb_arg)
1993-05-12 09:35:44 -03:00
def _cmd(self, cmd):
if DEBUG:
print '>>>',`cmd`
1993-05-12 09:35:44 -03:00
self.ofp.write(cmd)
self.ofp.flush()
def _waitdata(self, len, tout):
rep = ''
while len > 0:
if tout == None:
ready, d1, d2 = select.select( \
[self.ifp], [], [])
else:
ready, d1, d2 = select.select( \
[self.ifp], [], [], tout)
1993-05-12 09:35:44 -03:00
if ready == []:
## if rep:
## print 'FLUSHED:', `rep`
return None
data = self.ifp.read(1)
if DEBUG:
print '<<<',`data`
1993-05-12 09:35:44 -03:00
if data == NAK:
return NAK
rep = rep + data
len = len - 1
return rep
def _reply(self, len):
data = self._waitdata(len, 10)
if data == None:
raise error, 'Lost contact with VCR'
return data
def _getnumber(self, len):
data = self._reply(len)
number = 0
for c in data:
digit = ord(c) - NUMBER_N
if digit < 0 or digit > 9:
raise error, 'Non-digit in number'+`c`
number = number*10 + digit
return number
def _iflush(self):
dummy = self._waitdata(10000, 0)
1993-05-12 09:35:44 -03:00
## if dummy:
## print 'IFLUSH:', dummy
def simplecmd(self,cmd):
self._iflush()
1993-05-12 09:35:44 -03:00
for ch in cmd:
self._cmd(ch)
rep = self._reply(1)
if rep == NAK:
return 0
elif rep <> ACK:
raise error, 'Unexpected reply:' + `rep`
return 1
def replycmd(self, cmd):
if not self.simplecmd(cmd[:-1]):
return 0
self._cmd(cmd[-1])
1993-05-12 09:35:44 -03:00
def _number(self, number, digits):
if number < 0:
raise error, 'Unexpected negative number:'+ `number`
maxnum = pow(10, digits)
if number > maxnum:
raise error, 'Number too big'
while maxnum > 1:
number = number % maxnum
maxnum = maxnum / 10
digit = number / maxnum
ok = self.simplecmd(chr(NUMBER_N + digit))
if not ok:
raise error, 'Error while transmitting number'
def initvcr(self, *optarg):
timeout = None
if optarg <> ():
timeout = optarg[0]
starttime = time.time()
self.busy_cmd = None
1993-05-12 09:35:44 -03:00
self._iflush()
while 1:
## print 'SENDCL'
self._cmd(CL)
rep = self._waitdata(1, 2)
## print `rep`
if rep in ( None, CL, NAK ):
if timeout:
if time.time() - starttime > timeout:
raise error, \
'No reply from VCR'
1993-05-12 09:35:44 -03:00
continue
break
if rep <> ACK:
raise error, 'Unexpected reply:' + `rep`
dummy = self.simplecmd(CTRL_ENABLE)
def waitready(self):
rep = self._waitdata(1, None)
if rep == None:
raise error, 'Unexpected None reply from waitdata'
if rep not in (COMPLETION, ACK):
self._iflush()
raise error, 'Unexpected waitready reply:' + `rep`
self.busy_cmd = None
def testready(self):
rep = self._waitdata(1, 0)
1993-05-12 09:35:44 -03:00
if rep == None:
return 0
1993-05-12 09:35:44 -03:00
if rep not in (COMPLETION, ACK):
self._iflush()
raise error, 'Unexpected waitready reply:' + `rep`
self.busy_cmd = None
return 1
1993-05-12 09:35:44 -03:00
def play(self): return self.simplecmd(PLAY)
def stop(self): return self.simplecmd(STOP)
def ff(self): return self.simplecmd(FF)
def rew(self): return self.simplecmd(REW)
def eject(self):return self.simplecmd(EJECT)
def still(self):return self.simplecmd(STILL)
def step(self): return self.simplecmd(STEP_FWD)
def goto(self, (h, m, s, f)):
if not self.simplecmd(SEARCH_DATA):
return 0
self._number(h, 2)
self._number(m, 2)
self._number(s, 2)
self._number(f, 2)
if not self.simplecmd(ENTER):
return 0
return self._endlongcmd('goto')
1993-05-12 09:35:44 -03:00
# XXXX TC_SENSE doesn't seem to work
def faulty_where(self):
self._check()
1993-05-12 09:35:44 -03:00
self._cmd(TC_SENSE)
h = self._getnumber(2)
m = self._getnumber(2)
s = self._getnumber(2)
f = self._getnumber(2)
return (h, m, s, f)
def where(self):
return self.addr2tc(self.sense())
def sense(self):
self._check()
1993-05-12 09:35:44 -03:00
self._cmd(ADDR_SENSE)
num = self._getnumber(5)
return num
def addr2tc(self, num):
f = num % 25
num = num / 25
s = num % 60
num = num / 60
m = num % 60
h = num / 60
return (h, m, s, f)
def tc2addr(self, (h, m, s, f)):
return ((h*60 + m)*60 + s)*25 + f
def fmmode(self, mode):
self._check()
1993-05-12 09:35:44 -03:00
if mode == 'off':
arg = 0
elif mode == 'buffer':
arg = 1
elif mode == 'dnr':
arg = 2
else:
raise error, 'fmmode arg should be off, buffer or dnr'
if not self.simplecmd(FM_SELECT):
return 0
self._number(arg, 1)
if not self.simplecmd(ENTER):
return 0
return 1
def mute(self, mode, value):
self._check()
if mode == 'audio':
cmds = (MUTE_AUDIO_OFF, MUTE_AUDIO)
elif mode == 'video':
cmds = (MUTE_VIDEO_OFF, MUTE_VIDEO)
elif mode == 'av':
cmds = (MUTE_AV_OFF, MUTE_AV)
else:
raise error, 'mute type should be audio, video or av'
cmd = cmds[value]
return self.simplecmd(cmd)
def editmode(self, mode):
self._check()
if mode == 'off':
a0 = a1 = a2 = 0
elif mode == 'format':
a0 = 4
a1 = 7
a2 = 4
elif mode == 'asmbl':
a0 = 1
a1 = 7
a2 = 4
elif mode == 'insert-video':
a0 = 2
a1 = 4
a2 = 0
else:
raise 'editmode should be off,format,asmbl or insert-video'
if not self.simplecmd(EM_SELECT):
return 0
self._number(a0, 1)
self._number(a1, 1)
self._number(a2, 1)
if not self.simplecmd(ENTER):
return 0
return 1
def autoedit(self):
self._check()
return self._endlongcmd(AUTO_EDIT)
def nframerec(self, num):
if not self.simplecmd(N_FRAME_REC):
return 0
self._number(num, 4)
if not self.simplecmd(ENTER):
return 0
return self._endlongcmd('nframerec')
1993-05-12 09:35:44 -03:00
def fmstill(self):
if not self.simplecmd(FM_STILL):
return 0
return self._endlongcmd('fmstill')
def preview(self):
if not self.simplecmd(PREVIEW):
return 0
return self._endlongcmd('preview')
def review(self):
if not self.simplecmd(REVIEW):
return 0
return self._endlongcmd('review')
def search_preroll(self):
if not self.simplecmd(SEARCH_PREROLL):
return 0
return self._endlongcmd('search_preroll')
def edit_pb_standby(self):
if not self.simplecmd(EDIT_PB_STANDBY):
return 0
return self._endlongcmd('edit_pb_standby')
def edit_play(self):
if not self.simplecmd(EDIT_PLAY):
return 0
return self._endlongcmd('edit_play')
1993-05-12 09:35:44 -03:00
def dmcontrol(self, mode):
self._check()
1993-05-12 09:35:44 -03:00
if mode == 'off':
return self.simplecmd(DM_OFF)
if mode == 'multi freeze':
num = 1000
elif mode == 'zoom freeze':
num = 2000
elif mode == 'digital slow':
num = 3000
elif mode == 'freeze':
num = 4011
else:
raise error, 'unknown dmcontrol argument: ' + `mode`
if not self.simplecmd(DM_SET):
return 0
self._number(num, 4)
if not self.simplecmd(ENTER):
return 0
return 1
def fwdshuttle(self, num):
if not self.simplecmd(FWD_SHUTTLE):
return 0
self._number(num, 1)
return 1
def revshuttle(self, num):
if not self.simplecmd(REV_SHUTTLE):
return 0
self._number(num, 1)
return 1
def getentry(self, which):
self._check()
if which == 'in':
cmd = IN_ENTRY_SENSE
elif which == 'out':
cmd = OUT_ENTRY_SENSE
self.replycmd(cmd)
h = self._getnumber(2)
m = self._getnumber(2)
s = self._getnumber(2)
f = self._getnumber(2)
return (h, m, s, f)
def inentry(self, arg):
return self.ioentry(arg, (IN_ENTRY, IN_ENTRY_RESET, \
IN_ENTRY_SET, IN_ENTRY_INC, IN_ENTRY_DEC))
def outentry(self, arg):
return self.ioentry(arg, (OUT_ENTRY, OUT_ENTRY_RESET, \
OUT_ENTRY_SET, OUT_ENTRY_INC, OUT_ENTRY_DEC))
def ioentry(self, arg, (Load, Clear, Set, Inc, Dec)):
self._check()
if type(arg) == type(()):
h, m, s, f = arg
if not self.simplecmd(Set):
return 0
self._number(h,2)
self._number(m,2)
self._number(s,2)
self._number(f,2)
if not self.simplecmd(ENTER):
return 0
return 1
elif arg == 'reset':
cmd = Clear
elif arg == 'load':
cmd = Load
elif arg == '+':
cmd = Inc
elif arg == '-':
cmd = Dec
else:
raise error, 'Arg should be +,-,reset,load or (h,m,s,f)'
return self.simplecmd(cmd)
def cancel(self):
d = self.simplecmd(CL)
self.busy_cmd = None