Source code for instruments.AragonBOSA400

#import visa

import time
import socket
import logging
import csv
import numpy

OK = True
ERROR_BGN = 'Command'

[docs]class AragonBOSA400(object): ''' This class models the AndoAQ4321 laser. .. note:: When using any laser command, remember to send shut-off-laser command at the end of each sweep command set. For Trigger Sweep, send shut-off-laser command after sweep ends (sweep end condition noted in TriggerSweepSetup function) ''' def __init__(self, res_manager, address=''): ''' Constructor method :param res_manager: PyVisa resource manager :type res_manager: PyVisa resourceManager object :param address: SCPI address of instrument :type address: string ''' = False self.max_wavelength = 1579.9 self.min_wavelength = 1520 self.osa = BOSA("LAN", address)
[docs] def whoAmI(self): ''':returns: reference to device''' return 'OSA'
[docs] def change_state(self): = not
[docs] def get_max_wavelength(self): ''' Queries the maximum allowed wavelength :returns: Integer ''' return self.max_wavelength
[docs] def get_min_wavelength(self): ''' Queries the minimum allowed wavelength :returns: Integer ''' return self.min_wavelength
[docs] def setwavelength(self, wavelength): ''' Loads a single wavelength (in nanometers) and sets output high :param waveLength: Specified wavelength :type waveLength: Integer ''' command = 'sens:wav:stat {} nm'.format(wavelength) response = self.osa.ask(command) check_response(command, response, OK)
[docs] def setpower(self, wavelength): ''' BOSA cannot adjust output power - raise exception ''' raise BOSAException('Cannot adjust output power.')
[docs] def sweepWavelengthsTriggerSetup (self, start, end, step): ''' Have to keep track of Triggers in main command, use Stop Sweep Command to end sweep. Extra triggers do not make the laser sweep outside of specified end wavelength. Remeber to shut off laser after sweep ends :param start: Specified wavelength between 1520-1580 :type start: Integer :param end: Specified wavelength between 1520-1580 :type end: Integer :param step: Specified step in nm :type step: Float ''' if start < self.get_min_wavelength() or start > self.get_max_wavelength(): raise BOSAException('start wavelength Out of Bounds.') if end < self.get_min_wavelength() or end > self.get_max_wavelength(): raise BOSAException('end wavelength Out of Bounds.') if start > end: raise BOSAException('start wavelength must be smaller than end.') # set single sweep command = 'sens:wav:single on' response = self.osa.ask(command) check_response(command, response, OK) # set start waveLength command = 'sens:wav:star {} nm'.format(start) response = self.osa.ask(command) check_response(command, response, OK) # set end wavelength command = 'sens:wav:stop {} nm'.format(end) response = self.osa.ask(command) check_response(command, response, OK) # set sweep speed command = 'sens:wav:speed {} nm'.format(step) response = self.osa.ask(command) check_response(command, response, OK)
[docs] def trigger(self): '''Triggers laser - Now this just starts the sweep''' command = 'sens:sweep on' response = self.osa.ask(command) check_response(command, response, OK)
[docs] def stopSweep(self): '''Stop sweep''' command = 'sens:sweep off' response = self.osa.ask(command) check_response(command, response, OK)
[docs] def outputOFF(self): ''' Turns output of laser source OFF ''' command = 'sens:switch off' response = self.osa.ask(command) check_response(command, response, OK)
[docs] def outputON(self): ''' Turns output of laser source OFF ''' command = 'sens:switch on' response = self.osa.ask(command) check_response(command, response, OK) # wait until the laser is on while True: command = 'sens:switch?' response = self.osa.ask(command) check_response(command, response) if response.lower().startswith('on'): break
[docs] def getwavelength(self): ''' Queries wavelength of the laser :returns: Float ''' command = 'sens:wav:stat?' response = self.osa.ask(command) check_response(command, response) print response # TODO finalize how to parse and return a nmber
[docs] def getpower(self): ''' Gets output power in dbm :returns: Float ''' pass
[docs] def get_feedback(self): return self.getpower()
[docs] def getAuxInputPower(self): pass
[docs] def sweepWavelengthsContinuous (self, start, end, time): ''' Executes a sweep with respect to a specified time :param start: Specified wavelength between 1520-1580 :type start: Integer :param end: Specified wavelength between 1520-1580 :type end: Integer :param time: Specified time :type time: Float ''' pass
###### Function Calls Component Analyzer
[docs] def getIL(self, start, end , step, polarization): ''' Executes a sweep with respect to a specified time and collect injection loss information :param start: Specified wavelength between 1520-1580 :type start: Integer :param end: Specified wavelength between 1520-1580 :type end: Integer :param step: Specified step :type step: Float :param step: polarization mode :type step: String '''
[docs] def getRL(self, start, end , step, polarization): ''' Executes a sweep with respect to a specified time and collect reflection loss information :param start: Specified wavelength between 1520-1580 :type start: Integer :param end: Specified wavelength between 1520-1580 :type end: Integer :param step: Specified step :type step: Float :param step: polarization mode :type step: String '''
##### Function Calls OSA
[docs] def getSpectrum(self, start, end , step, polarization): ''' Executes a sweep with respect to a specified time and collect reflection loss information :param start: Specified wavelength between 1520-1580 :type start: Integer :param end: Specified wavelength between 1520-1580 :type end: Integer :param step: Specified step :type step: Float :param step: polarization mode :type step: String NOTE: polarization arg doesn't do anything yet! ''' if start < self.get_min_wavelength() or start > self.get_max_wavelength(): raise BOSAException('start wavelength Out of Bounds.') if end < self.get_min_wavelength() or end > self.get_max_wavelength(): raise BOSAException('end wavelength Out of Bounds.') if start > end: raise BOSAException('start wavelength must be smaller than end.') resolution = nm_to_ghz(step) # set start command = 'sens:wav:star {} nm'.format(start) response = self.osa.ask(command) check_response(command, response, OK) # set stop command = 'sens:wav:stop {} nm'.format(end) response = self.osa.ask(command) check_response(command, response, OK) # set resolution command = 'sens:wav:res {} ghz'.format(resolution) response = self.osa.ask(command) check_response(command, response, OK) # command = 'trace:data:count?' # response = self.osa.ask(command) # check_response(command, response) # print response # get the trace command = 'trace:data?' response = self.osa.ask(command) check_response(command, response) # parse csv string trace_data = response.split(',') n_trace_data = numpy.array(trace_data) n_trace_data = numpy.reshape(n_trace_data, (len(trace_data) / 2, 2)) # take only enough points accoridng to step # NOTE: BOSA does't change its sampling rate; it only changes it on the display original_size = numpy.size(n_trace_data, 0) print 'orig: ', original_size target_size = int((end - start + 1) / float(step) + 1) print 'targ: ', target_size skip_step = original_size / target_size print 'skip: ', skip_step return_data = n_trace_data[::skip_step] return return_data.astype(float).tolist()
[docs]def check_response(command, response, OK=False): if response.startswith(ERROR_BGN) or (OK and not response.startswith(OK_MSG)): raise BOSAException('cmd:< {} > repsonded with: {}'.format(command, response))
[docs]def nm_to_ghz(nm): return nm * 125
[docs]class BOSAException(Exception): pass
# create logger log = logging.getLogger(__name__) if(len(log.handlers) == 0): # check if the logger already exists # create logger log.setLevel(logging.INFO) # create console handler and set level to debug ch = logging.StreamHandler() ch.setLevel(logging.DEBUG) # create formatter formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') # add formatter to ch ch.setFormatter(formatter) # add ch to logger log.addHandler(ch)
[docs]class BOSA: """ class BOSA: driver used to communicate with BOSA equipment""" def __init__(self, interfaceType, location, portNo = 10000, IDN=True, Reset = False): """create the OSA object and tries to establish a connection with the equipment Parameters: interfaceType -> LAN, GPIB interface utilyzed in connection location -> IP address or GPIB address of equipment. portN -> no of the port where the interface is open (LAN) """ self.interfaceType = interfaceType self.location = location self.portNo = portNo self.activeTrace = None if(interfaceType.lower() == "lan"):"Connection to OSA using Lan interface on %r",location) try: self.connectLan() except Exception as e: log.exception("Could not connect to OSA device") print(e) raise e return elif(interfaceType.lower() == "gpib"):"GPIB interface chosen to connect OSA on %r",location) try: self.interface = visa.GpibInstrument(location) except Exception as e: log.exception("couldn't connect to device") print(e) raise e return"Connected to device.") else: log.error("Interface Type " + interfaceType + " not valid") raise Exception("interface type invalid") return if(IDN): try: log.debug("Sending IDN to device...") self.write("*IDN?") except Exception as e: log.exception("Could not send *IDN? device") print(e) raise e log.debug("IDN send, waiting response...") try: response = except Exception as e: log.exception("Could read response from device") print(e) raise e print("IDN= " + response) if(Reset): try:"resting device") self.write("*RST") except Exception as e: log.exception("Could not reset device") print(e) raise e def __del__(self): try: if(self.interfaceType == "LAN"): self.interface.close() elif(self.interfaceType == "GPIB"): self.interface.close() except Exception as e: log.warning("could not close interface correctly: exception %r", e.message)
[docs] def connectLan(self): """ connect the instrument to a LAN """ log.debug("creating socket") self.interface = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.interface.settimeout(30) try: log.debug("Connecting to remote socket...") self.interface.connect((self.location, self.portNo)) except Exception as e: log.exception("Could not connection to remote socket") print(e) raise e log.debug("Connected to remote socket")"OSA ready!")
[docs] def write(self, command): """ write to equiment: independent of the interface Parameters: command -> data to send to device + \r\n """ if(self.interfaceType.lower() == "lan"): log.debug("Sending command '" + command + "' using LAN interface...") try: self.interface.sendall( (command + "\r\n").encode()) except Exception as e: log.exception("Could not send data, command %r",command) print(e) raise e elif(self.interfaceType.lower() == "gpib"): log.debug("Sending command '" + command + "' using GPIB interface...") try: self.interface.write(command + "\r\n") except Exception as e: log.exception("Could not send data, command %r",command) print(e) raise e
[docs] def read(self): """ read something from device""" message = "" if(self.interfaceType.lower() == "lan"): log.debug("Reading data using LAN interface...") while(1): try: data = self.interface.recv(19200) message += data.decode() except Exception as e: log.exception("Could not read data") print(e) raise e if("\n" in message): break log.debug("All data readed!") elif(self.interfaceType.lower() == "gpib"): log.debug("Reading data using GPIB interface...") while(1): try: message = except Exception as e: log.exception("Could not read data") print(e) raise e if("\n" in message): break log.debug("All data readed!") log.debug("Data received: " + message) return message
[docs] def ask(self, command): """ writes and reads data""" data = "" self.write(command) data = return data
if __name__ == '__main__': import visa from time import sleep import matplotlib.pyplot as plt bosa = AragonBOSA400(visa.ResourceManager(), '') spec = bosa.getSpectrum(1540.5, 1560, 0.01, 0) print len(spec) plt.scatter(*zip(*spec)) print('done.') ''' Copyright (C) 2017 Robert Polster This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see <>. '''