import fcntl import IOCTL from IOCTL import * import sys import struct import select import posix import time DEVICE='/dev/ttyd2' class UnixFile: def open(self, name, mode): self.fd = posix.open(name, mode) return self def read(self, len): return posix.read(self.fd, len) def write(self, data): dummy = posix.write(self.fd, data) def flush(self): pass def fileno(self): return self.fd def close(self): dummy = posix.close(self.fd) def packttyargs(*args): if type(args) <> type(()): raise 'Incorrect argtype for packttyargs' if type(args[0]) == type(1): iflag, oflag, cflag, lflag, line, chars = args elif type(args[0]) == type(()): if len(args) <> 1: raise 'Only 1 argument expected' iflag, oflag, cflag, lflag, line, chars = args[0] elif type(args[0]) == type([]): if len(args) <> 1: raise 'Only 1 argument expected' [iflag, oflag, cflag, lflag, line, chars] = args[0] str = struct.pack('hhhhb', iflag, oflag, cflag, lflag, line) for c in chars: str = str + c return str def nullttyargs(): chars = ['\0']*IOCTL.NCCS return packttyargs(0, 0, 0, 0, 0, chars) def unpackttyargs(str): args = str[:-IOCTL.NCCS] rawchars = str[-IOCTL.NCCS:] chars = [] for c in rawchars: chars.append(c) iflag, oflag, cflag, lflag, line = struct.unpack('hhhhb', args) return (iflag, oflag, cflag, lflag, line, chars) def initline(name): fp = UnixFile().open(name, 2) ofp = fp fd = fp.fileno() rv = fcntl.ioctl(fd, IOCTL.TCGETA, nullttyargs()) iflag, oflag, cflag, lflag, line, chars = unpackttyargs(rv) iflag = iflag & ~(INPCK|ISTRIP|INLCR|IUCLC|IXON|IXOFF) oflag = oflag & ~OPOST cflag = B9600|CS8|CREAD|CLOCAL lflag = lflag & ~(ISIG|ICANON|ECHO|TOSTOP) chars[VMIN] = chr(1) chars[VTIME] = chr(0) arg = packttyargs(iflag, oflag, cflag, lflag, line, chars) dummy = fcntl.ioctl(fd, IOCTL.TCSETA, arg) return fp, ofp # # error = 'VCR.error' # Commands/replies: COMPLETION = '\x01' ACK ='\x0a' NAK ='\x0b' NUMBER_N = 0x30 ENTER = '\x40' EXP_7= '\xde' EXP_8= '\xdf' CL ='\x56' CTRL_ENABLE = EXP_8 + '\xc6' SEARCH_DATA = EXP_8 + '\x93' ADDR_SENSE = '\x60' PLAY ='\x3a' STOP ='\x3f' EJECT='\x2a' FF ='\xab' REW ='\xac' STILL='\x4f' STEP_FWD ='\x2b' # Was: '\xad' FM_SELECT=EXP_8 + '\xc8' FM_STILL=EXP_8 + '\xcd' PREVIEW=EXP_7 + '\x9d' REVIEW=EXP_7 + '\x9e' DM_OFF=EXP_8 + '\xc9' DM_SET=EXP_8 + '\xc4' FWD_SHUTTLE='\xb5' REV_SHUTTLE='\xb6' EM_SELECT=EXP_8 + '\xc0' N_FRAME_REC=EXP_8 + '\x92' SEARCH_PREROLL=EXP_8 + '\x90' EDIT_PB_STANDBY=EXP_8 + '\x96' EDIT_PLAY=EXP_8 + '\x98' AUTO_EDIT=EXP_7 + '\x9c' IN_ENTRY=EXP_7 + '\x90' IN_ENTRY_RESET=EXP_7 + '\x91' IN_ENTRY_SET=EXP_7 + '\x98' IN_ENTRY_INC=EXP_7 + '\x94' IN_ENTRY_DEC=EXP_7 + '\x95' IN_ENTRY_SENSE=EXP_7 + '\x9a' OUT_ENTRY=EXP_7 + '\x92' OUT_ENTRY_RESET=EXP_7 + '\x93' OUT_ENTRY_SET=EXP_7 + '\x99' OUT_ENTRY_INC=EXP_7 + '\x96' OUT_ENTRY_DEC=EXP_7 + '\x98' OUT_ENTRY_SENSE=EXP_7 + '\x9b' MUTE_AUDIO = '\x24' MUTE_AUDIO_OFF = '\x25' MUTE_VIDEO = '\x26' MUTE_VIDEO_OFF = '\x27' MUTE_AV = EXP_7 + '\xc6' MUTE_AV_OFF = EXP_7 + '\xc7' DEBUG=0 class VCR: def __init__(self): self.ifp, self.ofp = initline(DEVICE) self.busy_cmd = None self.async = 0 self.cb = None self.cb_arg = None def _check(self): if self.busy_cmd: raise error, 'Another command active: '+self.busy_cmd def _endlongcmd(self, name): if not self.async: self.waitready() return 1 self.busy_cmd = name return 'VCR BUSY' def fileno(self): return self.ifp.fileno() def setasync(self, async): self.async = async def setcallback(self, cb, arg): self.setasync(1) self.cb = cb self.cb_arg = arg def poll(self): if not self.async: raise error, 'Can only call poll() in async mode' if not self.busy_cmd: return if self.testready(): if self.cb: apply(self.cb, self.cb_arg) def _cmd(self, cmd): if DEBUG: print '>>>',`cmd` self.ofp.write(cmd) self.ofp.flush() def _waitdata(self, len, tout): rep = '' while len > 0: if tout == None: ready, d1, d2 = select.select( \ [self.ifp], [], []) else: ready, d1, d2 = select.select( \ [self.ifp], [], [], tout) if ready == []: ## if rep: ## print 'FLUSHED:', `rep` return None data = self.ifp.read(1) if DEBUG: print '<<<',`data` if data == NAK: return NAK rep = rep + data len = len - 1 return rep def _reply(self, len): data = self._waitdata(len, 10) if data == None: raise error, 'Lost contact with VCR' return data def _getnumber(self, len): data = self._reply(len) number = 0 for c in data: digit = ord(c) - NUMBER_N if digit < 0 or digit > 9: raise error, 'Non-digit in number'+`c` number = number*10 + digit return number def _iflush(self): dummy = self._waitdata(10000, 0) ## if dummy: ## print 'IFLUSH:', dummy def simplecmd(self,cmd): self._iflush() for ch in cmd: self._cmd(ch) rep = self._reply(1) if rep == NAK: return 0 elif rep <> ACK: raise error, 'Unexpected reply:' + `rep` return 1 def replycmd(self, cmd): if not self.simplecmd(cmd[:-1]): return 0 self._cmd(cmd[-1]) def _number(self, number, digits): if number < 0: raise error, 'Unexpected negative number:'+ `number` maxnum = pow(10, digits) if number > maxnum: raise error, 'Number too big' while maxnum > 1: number = number % maxnum maxnum = maxnum / 10 digit = number / maxnum ok = self.simplecmd(chr(NUMBER_N + digit)) if not ok: raise error, 'Error while transmitting number' def initvcr(self, *optarg): timeout = None if optarg <> (): timeout = optarg[0] starttime = time.time() self.busy_cmd = None self._iflush() while 1: ## print 'SENDCL' self._cmd(CL) rep = self._waitdata(1, 2) ## print `rep` if rep in ( None, CL, NAK ): if timeout: if time.time() - starttime > timeout: raise error, \ 'No reply from VCR' continue break if rep <> ACK: raise error, 'Unexpected reply:' + `rep` dummy = self.simplecmd(CTRL_ENABLE) def waitready(self): rep = self._waitdata(1, None) if rep == None: raise error, 'Unexpected None reply from waitdata' if rep not in (COMPLETION, ACK): self._iflush() raise error, 'Unexpected waitready reply:' + `rep` self.busy_cmd = None def testready(self): rep = self._waitdata(1, 0) if rep == None: return 0 if rep not in (COMPLETION, ACK): self._iflush() raise error, 'Unexpected waitready reply:' + `rep` self.busy_cmd = None return 1 def play(self): return self.simplecmd(PLAY) def stop(self): return self.simplecmd(STOP) def ff(self): return self.simplecmd(FF) def rew(self): return self.simplecmd(REW) def eject(self):return self.simplecmd(EJECT) def still(self):return self.simplecmd(STILL) def step(self): return self.simplecmd(STEP_FWD) def goto(self, (h, m, s, f)): if not self.simplecmd(SEARCH_DATA): return 0 self._number(h, 2) self._number(m, 2) self._number(s, 2) self._number(f, 2) if not self.simplecmd(ENTER): return 0 return self._endlongcmd('goto') # XXXX TC_SENSE doesn't seem to work def faulty_where(self): self._check() self._cmd(TC_SENSE) h = self._getnumber(2) m = self._getnumber(2) s = self._getnumber(2) f = self._getnumber(2) return (h, m, s, f) def where(self): return self.addr2tc(self.sense()) def sense(self): self._check() self._cmd(ADDR_SENSE) num = self._getnumber(5) return num def addr2tc(self, num): f = num % 25 num = num / 25 s = num % 60 num = num / 60 m = num % 60 h = num / 60 return (h, m, s, f) def tc2addr(self, (h, m, s, f)): return ((h*60 + m)*60 + s)*25 + f def fmmode(self, mode): self._check() if mode == 'off': arg = 0 elif mode == 'buffer': arg = 1 elif mode == 'dnr': arg = 2 else: raise error, 'fmmode arg should be off, buffer or dnr' if not self.simplecmd(FM_SELECT): return 0 self._number(arg, 1) if not self.simplecmd(ENTER): return 0 return 1 def mute(self, mode, value): self._check() if mode == 'audio': cmds = (MUTE_AUDIO_OFF, MUTE_AUDIO) elif mode == 'video': cmds = (MUTE_VIDEO_OFF, MUTE_VIDEO) elif mode == 'av': cmds = (MUTE_AV_OFF, MUTE_AV) else: raise error, 'mute type should be audio, video or av' cmd = cmds[value] return self.simplecmd(cmd) def editmode(self, mode): self._check() if mode == 'off': a0 = a1 = a2 = 0 elif mode == 'format': a0 = 4 a1 = 7 a2 = 4 elif mode == 'asmbl': a0 = 1 a1 = 7 a2 = 4 elif mode == 'insert-video': a0 = 2 a1 = 4 a2 = 0 else: raise 'editmode should be off,format,asmbl or insert-video' if not self.simplecmd(EM_SELECT): return 0 self._number(a0, 1) self._number(a1, 1) self._number(a2, 1) if not self.simplecmd(ENTER): return 0 return 1 def autoedit(self): self._check() return self._endlongcmd(AUTO_EDIT) def nframerec(self, num): if not self.simplecmd(N_FRAME_REC): return 0 self._number(num, 4) if not self.simplecmd(ENTER): return 0 return self._endlongcmd('nframerec') def fmstill(self): if not self.simplecmd(FM_STILL): return 0 return self._endlongcmd('fmstill') def preview(self): if not self.simplecmd(PREVIEW): return 0 return self._endlongcmd('preview') def review(self): if not self.simplecmd(REVIEW): return 0 return self._endlongcmd('review') def search_preroll(self): if not self.simplecmd(SEARCH_PREROLL): return 0 return self._endlongcmd('search_preroll') def edit_pb_standby(self): if not self.simplecmd(EDIT_PB_STANDBY): return 0 return self._endlongcmd('edit_pb_standby') def edit_play(self): if not self.simplecmd(EDIT_PLAY): return 0 return self._endlongcmd('edit_play') def dmcontrol(self, mode): self._check() if mode == 'off': return self.simplecmd(DM_OFF) if mode == 'multi freeze': num = 1000 elif mode == 'zoom freeze': num = 2000 elif mode == 'digital slow': num = 3000 elif mode == 'freeze': num = 4011 else: raise error, 'unknown dmcontrol argument: ' + `mode` if not self.simplecmd(DM_SET): return 0 self._number(num, 4) if not self.simplecmd(ENTER): return 0 return 1 def fwdshuttle(self, num): if not self.simplecmd(FWD_SHUTTLE): return 0 self._number(num, 1) return 1 def revshuttle(self, num): if not self.simplecmd(REV_SHUTTLE): return 0 self._number(num, 1) return 1 def getentry(self, which): self._check() if which == 'in': cmd = IN_ENTRY_SENSE elif which == 'out': cmd = OUT_ENTRY_SENSE self.replycmd(cmd) h = self._getnumber(2) m = self._getnumber(2) s = self._getnumber(2) f = self._getnumber(2) return (h, m, s, f) def inentry(self, arg): return self.ioentry(arg, (IN_ENTRY, IN_ENTRY_RESET, \ IN_ENTRY_SET, IN_ENTRY_INC, IN_ENTRY_DEC)) def outentry(self, arg): return self.ioentry(arg, (OUT_ENTRY, OUT_ENTRY_RESET, \ OUT_ENTRY_SET, OUT_ENTRY_INC, OUT_ENTRY_DEC)) def ioentry(self, arg, (Load, Clear, Set, Inc, Dec)): self._check() if type(arg) == type(()): h, m, s, f = arg if not self.simplecmd(Set): return 0 self._number(h,2) self._number(m,2) self._number(s,2) self._number(f,2) if not self.simplecmd(ENTER): return 0 return 1 elif arg == 'reset': cmd = Clear elif arg == 'load': cmd = Load elif arg == '+': cmd = Inc elif arg == '-': cmd = Dec else: raise error, 'Arg should be +,-,reset,load or (h,m,s,f)' return self.simplecmd(cmd) def cancel(self): d = self.simplecmd(CL) self.busy_cmd = None