Source code for spi_rack.s4g_module

# ------------------------------------------------------------------------------
# Description    : S4g SPI module QCoDeS interface
# Git repository : https://gitlab.com/qblox/packages/software/qblox_instruments.git
# Copyright (C) Qblox BV (2021)
# ------------------------------------------------------------------------------
import numpy as np
import threading
from time import sleep

from qcodes.instrument.channel import InstrumentChannel
from qcodes.instrument.parameter import ManualParameter
from qcodes import validators

from functools import partial
from typing import List, Optional

from spirack.S4g_module import S4g_module as S4g_api
from spi_rack.spi_module_base import spi_module_base


[docs]class dummy_s4g_api: """ Mock implementation of spirack API (https://github.com/mtiggelman/SPI-rack/blob/master/spirack/S4g_module.py), for use with the dummy drivers. """ SPAN_MAPPING = {"range_max_uni": 0, "range_max_bi": 2, "range_min_bi": 4} DAC_RESOLUTION = 2 ** 18 NUMBER_OF_DACS = 4
[docs] def __init__(self, spi_rack, module, max_current=50e-3, reset_currents=True): """ Instantiates the mock communication layer with identical parameters to the `spirack.S4g_module.S4g_module` constructor. Parameters ---------- spi_rack : dummy_spi_api Mock SPI_rack class object via which the communication runs module : int module number set on the hardware max_current : float maximum range of the S4g, configured in hardware reset_currents : bool if True, then reset all currents to zero and change the span to `range_max_bi` Returns ---------- """ self.parent = spi_rack self._currents = [0.] * self.NUMBER_OF_DACS init_span = self.SPAN_MAPPING["range_max_bi"] if reset_currents else 0 self._num_dacs = self.NUMBER_OF_DACS self._spans = [init_span] * self.NUMBER_OF_DACS self.max_current = max_current self.address = module
[docs] def change_span_update(self, DAC: int, span: int): """ Mocks the `change_span_update` function of the API. Parameters ---------- DAC : int Current output of which to change the span span : int values for the span as mentioned in the datasheet, use from SPAN_MAPPING Returns ---------- """ self._spans[DAC] = span
[docs] def change_span(self, DAC, span): """ Mocks the `change_span` function of the API Parameters ---------- DAC : int Current output of which to change the span span : int values for the span as mentioned in the datasheet, use from SPAN_MAPPING Returns ---------- """ self.change_span_update(DAC, span)
[docs] def set_current(self, DAC: int, current: float): """ Mocks the `set_voltage` function of the API Parameters ---------- DAC: int Current output of which to update the current voltage: float new DAC current """ self._currents[DAC] = current
[docs] def get_settings(self, DAC: int): """ Mocks the `get_settings` function of the API Parameters ---------- DAC : int Current output of which the settings will be read Returns ---------- float current int span """ return self._currents[DAC], self._spans[DAC]
[docs] def get_stepsize(self, DAC): """ Mocks the `get_stepsize` function of the API Parameters ---------- DAC : int Current output of which the stepsize is calculated Returns ---------- float Smallest current step possible with DAC """ if self._spans[DAC] == self.SPAN_MAPPING["range_max_bi"]: return 2 * self.max_current / self.DAC_RESOLUTION return self.max_current / self.DAC_RESOLUTION
[docs]class s4g_module(spi_module_base): """ `QCoDeS <https://qcodes.github.io/Qcodes/>`- style instrument channel driver for the S4g SPI module. """ NUMBER_OF_DACS = 4 # Set by hardware constraints
[docs] def __init__( self, parent, name: str, address: int, reset_currents: bool = True, dac_names: Optional[List[str]] = None, is_dummy: bool = False, ): """ Instantiates the driver object. This is the object that should be instantiated by the add_spi_module function. Parameters ---------- parent Reference to the spi_rack parent object. This is handled by the add_spi_module function. name : str Name given to the InstrumentChannel. address : int Module number set on the hardware. reset_currents : bool If True, then reset all currents to zero and change the span to `range_max_bi`. dac_names : List[str] List of all the names to use for the dac channels. If no list is given or is None, the default name "dac{i}" is used for the i-th dac channel. Returns ---------- Raises ---------- ValueError Length of the dac names list does not match the number of dacs. """ super().__init__(parent, name, address) api = dummy_s4g_api if is_dummy else S4g_api self.api = api(parent.spi_rack, module=address, reset_currents=reset_currents) self._channels = [] for dac in range(self.NUMBER_OF_DACS): if dac_names == None: ch_name = "dac{}".format(dac) elif len(dac_names) == self.NUMBER_OF_DACS: ch_name = dac_names[dac] else: raise ValueError(f"Length of dac_names must be {self.NUMBER_OF_DACS}") channel = s4g_dac_channel(self, ch_name, dac) self._channels.append(channel) self.add_submodule(ch_name, channel)
[docs] def set_dacs_zero(self): """ Sets all currents of all outputs to 0. Returns ---------- """ for ch in self._channels: ch.current(0)
[docs]class s4g_dac_channel(InstrumentChannel): """ `QCoDeS <https://qcodes.github.io/Qcodes/>`- style instrument channel driver for the dac channels of the S4g module. This class is used by the S4g_module to define the individual dac channels and should not be used directly. """
[docs] def __init__(self, parent: s4g_module, name: str, dac: int): """ Constructor for the dac channel instrument channel. Parameters ---------- parent : s4g_module Reference to the parent s4g_module name : str Name for the instrument channel dac : int Number of the dac that this channel corresponds to Returns ---------- Raises ---------- """ super().__init__(parent, name) self._api = parent.api self._is_ramping = False self._ramp_thread = None self.add_parameter( "current", get_cmd=partial(self._get_current, dac), set_cmd=partial(self._set_current, dac), unit="A", vals=validators.Numbers(min_value=-40e-3, max_value=40e-3), docstring="Sets the output current of the dac channel. Depending on the value of " "ramping_enabled, the output value is either achieved through slowly ramping, or " "instantaneously set.", ) self.add_parameter( "span", val_mapping={"range_max_uni": 0, "range_max_bi": 2, "range_min_bi": 4}, get_cmd=partial(self._get_span, dac), set_cmd=partial(self._api.change_span_update, dac), docstring="Sets the max range of the DACs. Possible values:" "\t'range_max_uni':\t0 - 40 mA," "\t'range_max_bi':\t-40 - 40 mA," "\t'range_min_bi':\t-20 - 20 mA.", ) self.add_parameter( "ramp_rate", unit="A/s", initial_value=1e-3, # 1 mA/s docstring="Limits the rate at which currents can be changed. The size of of steps is still " "limited by `ramp_max_step`.", parameter_class=ManualParameter, ) self.add_parameter( "ramp_max_step", unit="A", initial_value=0.5e-3, docstring="Sets the maximum step size for current ramping. The rate at which it ramps is set" " by `ramp_rate`.", parameter_class=ManualParameter, ) self.add_parameter( "ramping_enabled", initial_value=False, vals=validators.Bool(), parameter_class=ManualParameter, docstring="Turns ramping on or off. Toggling `ramping_enabled` changed the behavior of the " "setter for the `current` parameter. If enabled, ramping is done at a rate set by " "`ramp_rate` and in steps specified by `ramp_max_step`.", ) self.add_parameter( "is_ramping", get_cmd=lambda: self._is_ramping, set_cmd=False, docstring="Returns whether the dac is currently in the process of ramping.", ) self.add_parameter( "stepsize", unit="A", set_cmd=False, get_cmd=partial(self._api.get_stepsize, dac), docstring="Returns the smallest current step allowed by the dac for the current settings.", )
def _get_span(self, dac: int): """ Gets the span set by the module. Parameters ---------- dac : int the dac of which to get the span Returns ---------- int The current span """ _, span = self._api.get_settings(dac) return span def _get_current(self, dac: int): """ Gets the current set by the module. Parameters ---------- dac : int the dac of which to get the current Returns ---------- float The output current reported by the hardware """ current, _ = self._api.get_settings(dac) return current def _set_current(self, dac: int, val: float): """ Sets the current either through ramping or instantly. Parameters ---------- dac : int the dac of which to set the current val : float The new value of the current Returns ---------- """ if self.ramping_enabled(): self._set_current_ramp(dac, val) else: self.set_current_instant(dac, val) def _set_current_ramp(self, dac, val): """ Ramps the current in steps set by `ramp_max_step` with a rate set by `ramp_rate`. Ramping is non-blocking so the user should check `is_ramping() == False` to see if the final value is reached. Parameters ---------- dac : int the dac of which to set the current val : float The new value of the current after ramping Returns ---------- """ class RampThread(threading.Thread): """"Inner class that defines a thread that can be safely killed.""" def __init__(self, *args, **kwargs): super().__init__(target=self.worker_function, *args, **kwargs) self._stopped = False def worker_function(self, ch, dac, vals, dt): """"Conducts the actual ramping""" for val in vals: if self._stopped: return sleep(dt) ch.set_current_instant(dac, val) ch._is_ramping = False def exit(self): """"Stops the thread.""" self._stopped = True # Start of function code if self._is_ramping: self._ramp_thread.exit() # stop ramping to the value set previously, and ramp to new value. self._is_ramping = True val_begin = self.current() num_steps = int(np.ceil(np.abs(val_begin - val) / self.ramp_max_step())) vals = np.linspace(val_begin, val, num_steps) dt = np.abs(val_begin - val) / self.ramp_rate() / num_steps th = RampThread(args=(self, dac, vals, dt)) self._ramp_thread = th th.start()
[docs] def set_current_instant(self, dac, val): """ " Wrapper function around the set_current API call. Instantaneously sets the current. Parameters ---------- dac : int the dac of which to set the current val : float The new value of the current Returns ---------- """ self._api.set_current(dac, val)