Wednesday, 22 September 2010

Buffered csp oscilloscope

Add a buffer process so that the oscilloscope gets a whole frame at once.  the buffer process triggers on positive zero crossings.

traces.py

#!/usr/bin/env python

#
# Example oscilloscope traces.
#

import sys
from csp.csp import *
from csp.builtins import Sin, Cos, GenerateFloats, Mux2, Delta2
from bscope import Oscilloscope


@forever
def Random(outchan):
    """Random process.

    Generates random data and writes it to outchan.
    """
    import random
    while True:
        outchan.write(random.random())
        yield
    return


@forever
def Buffer(inchan, outchan):
    """Buffer one frame and send as a list.  This version simply uses
    a literal constant frame size which should mathc that used in
    oscilloscope.py.
    """
    a = []
    while True:
        y = inchan.read()
        a.append(y)
        if len(a) == 512:
            outchan.write(a)
            a = []
        yield

@forever
def TriggeredBuffer(inchan, outchan):
    """Buffer one frame and send as a list.  This version simply uses
    a literal constant frame size which should mathc that used in
    oscilloscope.py.

    Waits for positive going zero crossing.
    """
    a = []
    armed = False
    while True:
        y = inchan.read()
        if 0 < len(a):
            a.append(y)
            if len(a) == 512:
                outchan.write(a)
                a = []
                armed = False
        elif armed:
            if yp < y and 0 <= y:
                a.append(y);
        elif y < 0:
            armed = True
        yp = y
        yield

def trace_random():
    """Test the Oscilloscope with random data.
    """
    channel = Channel()
    par = Par(Random(channel), Oscilloscope(channel))
    par.start()
    return


def trace_sin():
    """Plot a sine wave on the oscilloscope.
    """
    channels = Channel(), Channel()
    par = Par(GenerateFloats(channels[0]),
              Sin(channels[0], channels[1]),
              Oscilloscope(channels[1]))
    par.start()
    return   


def trace_cos():
    """Plot a cosine wave on the oscilloscope.
    """
    channels = Channel(), Channel(), Channel()
    par = Par(GenerateFloats(channels[0]),
              Cos(channels[0], channels[1]),
              TriggeredBuffer(channels[1],channels[2]),
              Oscilloscope(channels[2]))
    par.start()
    return   


def trace_mux():
    """Plot sine and cosine waves on the oscilloscope.
    """
    channels = [Channel() for i in range(6)]
    par = Par(GenerateFloats(channels[0]),
              Delta2(channels[0], channels[1], channels[2]),
              Cos(channels[1], channels[3]),
              Sin(channels[2], channels[4]),
              Mux2(channels[3], channels[4], channels[5]),
              Oscilloscope(channels[5]))
    par.start()
    return   

EXAMPLES = {}
for name, func in globals().items():
    if name.startswith('trace_'):
        EXAMPLES[name[6:]] = func

if __name__ == '__main__':
    if len(sys.argv) != 2:
        print('Syntax: python {0} {1}'.format(sys.argv[0],
                                              ' | '.join(EXAMPLES.keys())))
        for name, func in EXAMPLES.items():
            print(' {0:<9} {1}'.format(name, func.func_doc.strip()))
    elif sys.argv[1] not in EXAMPLES:
        print('Unknown example {0}'.format(sys.argv[1]))
    else:
        print('Use cursor up/down for scaling, s for save and q for quit')
        EXAMPLES[sys.argv[1]]()

bscope.py

#!/usr/bin/env python

"""
Simple oscilloscope traces for python-csp.
Requires Pygame.

Features:
    * Press 's' to save an oscilloscope trace as a PNG.
    * Press UP and DOWN to scale the input more / less.

Copyright (C) Sarah Mount, 2009.

This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either version 2
of the License, or (at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program.  If not, see <http://www.gnu.org/licenses/>.

Changes: (C) Kevin Whitefoot, 2010 

  Allow sender to send a complete frame instead of just a single new sample.

"""


from csp.csp import *

import copy
import numpy
import pygame


__author__ = 'Sarah Mount <s.mount@wlv.ac.uk>'
__date__ = 'November 2009'
__version__ = '0.2'


@forever
def Oscilloscope(inchan, scale=80.0, _process=None):
    # Constants
    WIDTH, HEIGHT = 512, 256
    TRACE, GREY = (80, 255, 100), (110, 110, 110)
    caption = 'Oscilloscope'
    filename = caption + '.png'
    # Open window
    pygame.init()
    screen = pygame.display.set_mode((WIDTH, HEIGHT), 0)
    pygame.display.set_caption(caption)
    # Create a blank chart with vertical ticks, etc
    blank = numpy.zeros((WIDTH, HEIGHT, 3), dtype=numpy.int8)
    # Draw x-axis
    xaxis = HEIGHT // 2
    blank[::, xaxis] = GREY
    # Draw vertical ticks
    vticks = [-100, -50, +50, +100]
    for vtick in vticks: blank[::5, xaxis + vtick] = GREY # Horizontals
    blank[::50, ::5] = GREY          # Verticals
    # Draw the 'blank' screen.
    pygame.surfarray.blit_array(screen, blank)      # Blit the screen buffer
    pygame.display.flip()                           # Flip the double buffer
    # ydata stores data for the trace.
    ydata = [0.0 for i in range(WIDTH)] # assert len(ydata) <= WIDTH

    QUIT = False
    while not QUIT:

        pixels = copy.copy(blank)
        indata = inchan.read()       
        if type(indata) == list: 
            # We have  been sent a complete frame  
            ydata = map(lambda(x): x*scale, indata)    
        else:  
            # we only have one new point  
            ydata.append(inchan.read() * scale)
            ydata.pop(0)

        for x in range(WIDTH):
            try: pixels[x][xaxis - int(ydata[x])] = TRACE
            except: pass
        pygame.surfarray.blit_array(screen, pixels)     # Blit the screen buffer
        pygame.display.flip()                           # Flip the double buffer
        #pygame.display.update(0, xaxis-100, WIDTH, 201) # Flip the double buffer
        del pixels # Use constant space.
        for event in pygame.event.get():
            if event.type == pygame.QUIT \
            or event.type == pygame.KEYDOWN and event.key == pygame.K_q:
                QUIT = True
            elif event.type == pygame.KEYDOWN and event.key == pygame.K_s:
                pygame.image.save(screen, filename)
                print('Saving oscope image in: ' + str ( filename ) )
            elif event.type == pygame.KEYDOWN and event.key == pygame.K_UP:
                scale += 10.0
                print('Oscilloscope scaling by %f' % scale)
            elif event.type == pygame.KEYDOWN and event.key == pygame.K_DOWN:
                if scale - 10.0 > 0.0: scale -= 10.0
                print('Oscilloscope scaling by %f' % scale)
        yield
    inchan.poison()
    pygame.display.quit()
    return


if __name__ == '__main__':
    print('For this tutorial run traces.py')

Posted via email from kwhitefoot's posterous

No comments:

Post a Comment

Followers