Source code for spi_rack.s4g_module

#------------------------------------------------------------------------------
# Description    : S4g SPI module QCoDeS interface
# Git repository : https://gitlab.com/qblox/packages/software/qblox_instruments.git
# Copyright (C) Qblox BV (2021)
#------------------------------------------------------------------------------
import numpy as np
import threading
from time import sleep

from qcodes.instrument.channel import InstrumentChannel
from qcodes.instrument.parameter import ManualParameter
from qcodes import validators

from functools import partial
from typing import List, Optional

from spirack.S4g_module import S4g_module as S4g_api
from spi_rack.spi_module_base import spi_module_base


[docs]class s4g_module(spi_module_base): """ `QCoDeS <https://qcodes.github.io/Qcodes/>`- style instrument channel driver for the S4g SPI module. """ NUMBER_OF_DACS = 4 # Set by hardware constraints
[docs] def __init__(self, parent, name: str, address: int, reset_currents: bool = True, dac_names: Optional[List[str]] = None): """ Instantiates the driver object. This is the object that should be instantiated by the add_spi_module function. Parameters ---------- parent Reference to the spi_rack parent object. This is handled by the add_spi_module function. name : str Name given to the InstrumentChannel. address : int Module number set on the hardware. reset_currents : bool If True, then reset all currents to zero and change the span to `range_max_bi`. dac_names : List[str] List of all the names to use for the dac channels. If no list is given or is None, the default name "dac{i}" is used for the i-th dac channel. Returns ---------- Raises ---------- ValueError Length of the dac names list does not match the number of dacs. """ super().__init__(parent, name, address) self.api = S4g_api(parent.spi_rack, module=address, reset_currents=reset_currents) self._channels = [] for dac in range(self.NUMBER_OF_DACS): if dac_names == None: ch_name = "dac{}".format(dac) elif len(dac_names) == self.NUMBER_OF_DACS: ch_name = dac_names[dac] else: raise ValueError(f"Length of dac_names must be {self.NUMBER_OF_DACS}") channel = s4g_dac_channel(self, ch_name, dac) self._channels.append(channel) self.add_submodule(ch_name, channel)
[docs] def set_dacs_zero(self): """ Sets all currents of all outputs to 0. """ for ch in self._channels: ch.current(0)
[docs]class s4g_dac_channel(InstrumentChannel): """ `QCoDeS <https://qcodes.github.io/Qcodes/>`- style instrument channel driver for the dac channels of the S4g module. This class is used by the S4g_module to define the individual dac channels and should not be used directly. """
[docs] def __init__(self, parent: s4g_module, name: str, dac: int): """ Constructor for the dac channel instrument channel. Parameters ---------- parent : s4g_module Reference to the parent s4g_module name : str Name for the instrument channel dac : int Number of the dac that this channel corresponds to Returns ---------- Raises ---------- """ super().__init__(parent, name) self._api = parent.api self._is_ramping = False self._ramp_thread = None self.add_parameter('current', get_cmd=partial(self._get_current, dac), set_cmd=partial(self._set_current, dac), unit="A", vals=validators.Numbers(min_value=-40e-3, max_value=40e-3 ), docstring="Sets the output current of the dac channel. Depending on the value of " "ramping_enabled, the output value is either achieved through slowly ramping, or " "instantaneously set." ) self.add_parameter('span', val_mapping={'range_max_uni': 0, 'range_max_bi': 2, 'range_min_bi': 4}, get_cmd=partial(self._get_span, dac), # FIXME: change to change_span instead of change_span_update when bug in api is fixed set_cmd=partial(self._api.change_span_update, dac), docstring="Sets the max range of the DACs. Possible values:" "\t\'range_max_uni\':\t0 - 40 mA" "\t\'range_max_bi\':\t-40 - 40 mA" "\t\'range_min_bi\':\t-20 - 20 mA" ) self.add_parameter('ramp_rate', unit='A/s', initial_value=1e-3, # 1 mA/s docstring='Limits the rate at which currents can be changed. The size of of steps is still ' 'limited by `ramp_max_step`.', parameter_class=ManualParameter ) self.add_parameter('ramp_max_step', unit='A', initial_value=0.5e-3, docstring='Sets the maximum step size for current ramping. The rate at which it ramps is set' ' by `ramp_rate`.', parameter_class=ManualParameter ) self.add_parameter('ramping_enabled', initial_value=False, vals=validators.Bool(), parameter_class=ManualParameter, docstring='Turns ramping on or off. Toggling `ramping_enabled` changed the behavior of the ' 'setter for the `current` parameter. If enabled, ramping is done at a rate set by ' '`ramp_rate` and in steps specified by `ramp_max_step`.' ) self.add_parameter('is_ramping', get_cmd=lambda: self._is_ramping, set_cmd=False, docstring="Returns whether the dac is currently in the process of ramping.") self.add_parameter('stepsize', unit='A', set_cmd=False, get_cmd=partial(self._api.get_stepsize, dac), docstring="Returns the smallest current step allowed by the dac for the current settings." )
def _get_span(self, dac): """" Gets the span set by the module. """ _, span = self._api.get_settings(dac) return span def _get_current(self, dac): """" Gets the current set by the module. """ current, _ = self._api.get_settings(dac) return current def _set_current(self, dac, val): """ Sets the current either through ramping or instantly. """ if self.ramping_enabled(): self._set_current_ramp(dac, val) else: self.set_current_instant(dac, val) def _set_current_ramp(self, dac, val): """ Ramps the current in steps set by `ramp_max_step` with a rate set by `ramp_rate`. Ramping is non-blocking so the user should check `is_ramping() == False` to see if the final value is reached. """ class RampThread(threading.Thread): """"Inner class that defines a thread that can be safely killed.""" def __init__(self, *args, **kwargs): super().__init__(target=self.worker_function, *args, **kwargs) self._stopped = False def worker_function(self, ch, dac, vals, dt): """"Conducts the actual ramping""" for val in vals: if self._stopped: return sleep(dt) ch.set_current_instant(dac, val) ch._is_ramping = False def exit(self): """"Stops the thread.""" self._stopped = True # Start of function code if self._is_ramping: self._ramp_thread.exit() # stop ramping to the value set previously, and ramp to new value. self._is_ramping = True val_begin = self.current() num_steps = int(np.ceil(np.abs(val_begin - val)/self.ramp_max_step())) vals = np.linspace(val_begin, val, num_steps) dt = np.abs(val_begin - val)/self.ramp_rate()/num_steps th = RampThread(args=(self, dac, vals, dt)) self._ramp_thread = th th.start()
[docs] def set_current_instant(self, dac, val): """" Wrapper function around the set_current API call. """ self._api.set_current(dac, val)