Source code for qblox_instruments.ieee488_2.ip_transport

# ----------------------------------------------------------------------------
# Description    : Transport layer (abstract, IP, file, dummy)
# Git repository : https://gitlab.com/qblox/packages/software/qblox_instruments.git
# Copyright (C) Qblox BV (2020)
# ----------------------------------------------------------------------------


# -- include -----------------------------------------------------------------

import socket
import os
import sys

from qblox_instruments.ieee488_2 import Transport


# -- class -------------------------------------------------------------------


[docs] class IpTransport(Transport): """ Class for data transport of IP socket. """ # ------------------------------------------------------------------------
[docs] def __init__( self, host: str, port: int = 5025, timeout: float = 60.0, snd_buf_size: int = 512 * 1024, ): """ Create IP socket transport class. Parameters ---------- host : str Instrument IP address. port : int Instrument port. timeout : float Instrument call timeout in seconds. snd_buf_size : int Instrument buffer size for transmissions to instrument. Returns ---------- Raises ---------- """ # 1. Setup timeout (before connecting) # 2. Enlarge buffer # 3. Send immediately # 4. Setup keep alive pinging # 5. Connect self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._socket.settimeout(3) self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, snd_buf_size) self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) self._set_keepalive() self._socket.connect((host, port)) # Timeout after connecting self._socket.settimeout(timeout)
# ------------------------------------------------------------------------ def _set_keepalive( self, after_idle_sec: int = 60, interval_sec: int = 60, max_fails: int = 5 ) -> None: """ This function instructs the TCP socket to send a heart beat every n seconds to detect dead connections. It's the TCP equivalent of the IRC ping-pong protocol and allows for better cleanup / detection of dead TCP connections. It activates after 60 second (after_idle_sec) of idleness, then sends a keepalive ping once every 60 seconds (interval_sec), and closes the connection after 5 failed ping (max_fails), or 300 seconds by default. Parameters ---------- after_idle_sec : int Activate keepalive after n seconds. interval_sec : int Packet interval in seconds. max_fails : int Maximum number of failed packets. Returns ---------- Raises ---------- """ if os.name == "nt": # Windows after_idle_sec *= 1000 interval_sec *= 1000 # pylint: disable=no-member self._socket.ioctl( socket.SIO_KEEPALIVE_VALS, (1, after_idle_sec, interval_sec) ) elif sys.platform == "darwin": # MacOS TCP_KEEPALIVE = ( 0x10 # From /usr/include, not exported by Python's socket module ) self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) self._socket.setsockopt(socket.IPPROTO_TCP, TCP_KEEPALIVE, interval_sec) else: # Linux self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) self._socket.setsockopt( socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, after_idle_sec ) self._socket.setsockopt( socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, interval_sec ) self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, max_fails) # ------------------------------------------------------------------------ def __del__(self) -> None: """ Delete IP socket transport class. Parameters ---------- Returns ---------- Raises ---------- """ self.close() # ------------------------------------------------------------------------
[docs] def close(self) -> None: """ Close IP socket. Parameters ---------- Returns ---------- Raises ---------- """ self._socket.close()
# ------------------------------------------------------------------------
[docs] def write(self, cmd_str: str) -> None: """ Write command to instrument over IP socket. Parameters ---------- cmd_str : str Command Returns ---------- Raises ---------- """ out_str = cmd_str + "\n" self.write_binary(out_str.encode("ascii"))
# ------------------------------------------------------------------------
[docs] def write_binary(self, data: bytes) -> None: """ Write binary data to instrument over IP socket. Parameters ---------- data : bytes Binary data Returns ---------- Raises ---------- """ exp_len = len(data) act_len = 0 while True: act_len += self._socket.send(data[act_len:exp_len]) if act_len == exp_len: break
# ------------------------------------------------------------------------
[docs] def read_binary(self, size: int) -> bytes: """ Read binary data from instrument over IP socket. Parameters ---------- size : int Number of bytes Returns ---------- bytes Binary data array of length "size". Raises ---------- """ data = self._socket.recv(size) act_len = len(data) exp_len = size while act_len != exp_len: data += self._socket.recv(exp_len - act_len) act_len = len(data) return data
# ------------------------------------------------------------------------
[docs] def readline(self) -> str: """ Read data from instrument over IP socket. Parameters ---------- Returns ---------- str String with data. Raises ---------- """ return self._socket.makefile().readline()