AutoSkill data / radar / self
General SOP for common requests related to data, radar, self.
install
source · Clone the upstream repo
git clone https://github.com/ECNU-ICALK/AutoSkill
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/ECNU-ICALK/AutoSkill "$T" && mkdir -p ~/.claude/skills && cp -r "$T/SkillBank/ConvSkill/english_gpt4_8_GLM4.7/data-radar-self" ~/.claude/skills/ecnu-icalk-autoskill-data-radar-self && rm -rf "$T"
manifest:
SkillBank/ConvSkill/english_gpt4_8_GLM4.7/data-radar-self/SKILL.mdsource content
data / radar / self
General SOP for common requests related to data, radar, self.
Prompt
Follow this SOP (replace specifics with placeholders like <PROJECT>/<ENV>/<VERSION>):
- a2fdf42e0ad0be2f6a734526bebf98a0.json#conv_1
- Use the user questions below as the PRIMARY extraction evidence
- Use the full conversation below as SECONDARY context reference
- In the full conversation section, assistant/model replies are reference-only and not skill evidence
- Primary User Questions (main evidence
- module": "radar_toolbox", "module_structure": "radar_toolbox\conf/capture_conf.json;radar_toolbox\conf/profile_vs1642_exp.cfg;radar_toolbox\conf/raw_bin_to_numpy.json;radar_toolbox\TI_data/sensor_com.py;", "module_files_already_generated_doc": [{"file": "radar_toolbox\fmcw_utils.md", "doc": "# radar_toolbox/fmcw_utils.py Documentation\n\n## Overview\nfmcw_utils.py is a part of the radar_toolbox module and provides utility functions, classes, and constants specifically for frequency modulated continuous wave (FMCW) radar systems. This file is crucial for managing radar models, loading and processing radar data, and preparing captured raw data for analysis.\n\n## Classes\n\n### RadarModel (Enum)\nAn enumeration for the supported radar models.\n\n- IWR1642\n- IWR6843AOP\n- IWRL6432\n- get_model_from_str(string): Static method that takes a string identifier and returns the corresponding RadarModel enum. Raises NotImplementedError if the radar model is not implemented.\n\n### PointsAttributes (Enum)\nEnumeration of possible point cloud attributes\n\n- RANGE\n- AZIMUTH\n- ELEVATION\n- VELOCITY\n- points_attributes_abs(points, attribute_idx): Modifies the given points array by taking the absolute value of the specified attribute.\n\n### AntennaDataSaver\nA helper class for storing calculated FFT data.\n\n- init(save_abs_1D_FFTs, save_abs_2D_FFTs): Constructor specifying whether to save FFT data.\n- add_abs_1D_FFTs(FFTs_1D): Stores one-dimensional FFT results.\n- add_abs_2D_FFTs(FFTs_2D): Stores two-dimensional FFT results.\n\n### RadarDataSaver\nDesigned to store common detection matrices for further analysis.\n\n- init(save_common_detection_matrices): Constructor specifying whether to save common detection matrices.\n- add_common_detection_matrices(common_detection_matrices): Adds a new common detection matrix.\n\n## Functions\n\n### get_phase_compensation_for_radar_model(model)\nReturns the phase compensation array for a specific radar model. Raises NotImplementedError if the radar model is not handled.\n\n### load_IQ_file(path, is_complex, nb_samples, nb_chirps)\nLoads interleaved I/Q data from a file and returns a numpy array reshaped for chirps and samples. Accommodates complex data if is_complex is True.\n\n### load_IQ_data(path_prefix, is_complex, nb_antennas, nb_samples, nb_chirps)\nLoads interleaved I/Q data across multiple antennas and returns a combined numpy array for further processing.\n\n### raw_capture_bin_preparation(capture_bin_file_path, cfg_file_path, radar, output_bin_file_path, verbose)\nProcesses a binary capture file and prepares it in a structured format suitable for analysis, including handling of missing packets.\n\n### prepared_bin_to_numpy(bin_file_path, cfg_file_path, radar, nb_lanes)\nConverts a prepared binary radar data file to numpy format, taking into account missing packets and ensures only complete frames are returned.\n\n### coordinates_from_range_and_angles(azimuths, elevations, ranges)\nCalculates Cartesian coordinates from radar data specified by ranges, azimuths, and elevation angles.\n\n### read_cfg_file(path, radar_model, verbose)\nReads a .cfg file and returns a configuration dictionary with key parameters for the radar operation. Handles different radar models.\n\n### describe_config(config)\nPrints the configuration dictionary in a readable format for better understanding of the radar setup.\n\n### safe_memmap(filename, mode, dtype, shape)\nCreates a memory-mapped file for large arrays ensuring compatibility with the numpy .npy format.\n\n### Frame\nA class managing the construction of radar frames from raw data, handling cross-frame packet scenarios.\n\n- init( ...): Initializes the Frame object, which holds data for a single radar frame.\n- is_complete(): Returns whether the frame is complete.\n- add_values(data_to_append): Appends new data to the current frame.\n- get_frame_values(): Retrieves the final frame values.\n- get_next_frame(): Builds and returns a new frame with the remaining data if the current frame is complete.\n- get_frame_index(): Returns the index of the current frame.\n- get_nb_values_in_frame(): Returns the number of values in a frame.\n- order_frame(flat_unordered_frame): Orders a flat unordered frame into a structured numpy array.\n\n### AntennaConfig\nRepresents the configuration of an FMCW radar antenna, with attributes like start frequency, sampling rate, etc.\n\n- init( ...): Initializes the AntennaConfig instance with radar parameters.\n- get_antenna_parameters_from_config_file(config): Static method that creates an instance of AntennaConfig from radar configuration parameters.\n\n## Data Flow\nThe individual functions and classes deal with various stages of data acquisition, preprocessing, and transformation into formats that can be utilized for subsequent analysis. It goes from reading radar configurations, capturing raw binary data, preprocessing the data, extracting I/Q samples, organizing data into frames, and finally converting into coordinates relevant for point cloud generation.\n\n## Usage\nA developer would typically use these utilities to interface with FMCW radar hardware, manage radar configurations, process captured radar signals, and transform them into more analytical forms for further tasks such as object detection or tracking.\n"}, {"file": "radar_toolbox\utils.md", "doc": "# radar_toolbox/utils.py Documentation\n\n## Overview\n
is a module within theutils.py
that houses utility functions and classes essential for various aspects of the FMCW radar project. These utilities include configuration file handling, data loading, configuration parsing, and more.\n\n## Classes\n\n### GridSize\nA data class representing the size of a grid in three dimensions - x, y, and z.\n\n-radar_toolbox
: Number of grid elements along the x-axis.\n-x
: Number of grid elements along the y-axis.\n-y
: Number of grid elements along the z-axis.\n\n#### Methods\n\n-z
: Returns the shape of the grid as a tuple (x, y, z).\n\n### CaptureBoundaries\nA data class representing the boundaries of a capture session in three dimensions - x, y, and z.\n\n-get_shape()
: Minimum value along the x-axis.\n-x_min
: Maximum value along the x-axis.\n-x_max
: Minimum value along the y-axis.\n-y_min
: Maximum value along the y-axis.\n-y_max
: Minimum value along the z-axis.\n-z_min
: Maximum value along the z-axis.\n\n### Other Utility Functions\n\n-z_max
: Yields successive n-sized chunks from a list.\n-chunks(lst, n)
: Configures an argument parser for handling configuration files.\n-configure_argument_parser_for_conf_file()
: Deep merges two dictionaries.\n-dict_deepmerge(x, y)
: Loads a configuration file, merging it with additional configurations if specified.\n-load_conf(config_file)
: Dynamically loads a class from a module.\n-get_class(cls)
: Loads data from a specified data loader configuration.\n-load_data_from_conf(config_dataloader)
: Creates a PyTorch estimator for model training.\n-create_pytorch_estimator(...)
: Creates a custom estimator for model training.\n-create_custom_estimator(...)
: Constructs the URI for a Docker image.\n-get_image_uri(...)
: Parses extra arguments and adds them to the argument dictionary.\n-parse_extra_args(args, extra_args)
: Generator function for reading a CSV file of a capture session.\n-csv_capture_player(config)
: Constructs callbacks for model training based on specified parameters.\n-get_callbacks_from_params(callbacks_params)
: Retrieves a radar data reader based on the radar model specified in the configuration.\n-get_reader_from_config(config)
: Rotates coordinates anti-clockwise through an angle theta about the origin.\n-coord_rotation(first_axis_values, second_axis_values, theta)
: Corrects inclinations in the width-depth and height-depth planes for given XYZ coordinates.\n\n## Data Flow\nThe utility functions and classes inpoints_inclination_correction(points, horizontal_inclination, vertical_inclination)
contribute to the broader data flow within the FMCW radar project. They handle configuration loading, data preprocessing, model training, and coordinate transformations. The data flow generally involves reading configuration files, loading and processing radar data, and preparing it for analysis or model training.\n\n## Usage\nDevelopers can leverageutils.py
to streamline various aspects of the FMCW radar project. This includes handling configuration files, loading and preprocessing radar data, training machine learning models, and correcting coordinate inclinations."}, {"file": "radar_toolbox\capture_session\main.md", "doc": "# radar_toolbox/capture_session/main.py Documentation\n\n## Overview\nutils.py
is a crucial component of themain.py
module, specifically within theradar_toolbox
submodule. This module is responsible for recording raw radar captures and serves as the entry point for capturing data from the FMCW radar system. The file contains functionalities for configuring the radar, initiating recording sessions, and handling raw data acquisition.\n\n## Functionality\n\n###capture_session
\nThis function generates the file path for saving raw radar data based on the provided parameters. It ensures proper organization of data by creating folders for each person and session type, and it increments the session index to avoid overwriting existing sessions.\n\n### Main Execution\nThe main part of the script is executed when the file is run. It performs the following steps:\n\n1. Argument Parsing: Parses command line arguments, specifically the configuration file path.\n2. Configuration Loading: Loads configuration settings from the specified file.\n3. File Path Generation: Callsget_record_file_path(data_folder, person, session_type)
to determine the file path for saving the raw radar data.\n4. Radar Configuration: Reads the radar model and configuration from the specified configuration file.\n5. Data Reader Initialization: Initializes the data reader based on the radar model and connects to the radar system.\n6. Recording Setup: Configures the radar system for recording, including sending the configuration file.\n7. Data Capture: Initiates the recording process, capturing raw data packets and saving them to the specified file.\n\n### Interrupt Handling\nThe script is designed to handle keyboard interrupts (e.g., user pressing Ctrl+C) gracefully. It ensures that the recording is stopped, and the data reader is disconnected before exiting.\n\n## Dependencies\n- Modules from radar_toolbox:\n -get_record_file_path
: Utilized for radar model enumeration, reading configuration files, and handling radar configurations.\n -fmcw_utils
: Imports theraw_data.adc
class for configuring and interacting with the ADC (Analog-to-Digital Converter).\n -DCA1000
: Utilizes utility functions for configuration file handling and data loading.\n\n## Usage\nTo useutils
, run the script with the necessary command line arguments, such as the path to the configuration file. For example:\nmain.py
\n\nThis script is essential for initiating and managing radar data recording sessions. Developers can customize recording settings, handle configuration files, and interact with the radar system through this module.\n\n## Data Flow\nThe data flow withinbash\npython main.py --conf=conf/capture_conf.json\n
involves configuring the radar system, initiating recording, capturing raw data packets, and saving them to a file. It serves as a critical link between the hardware (FMCW radar) and the subsequent processing stages within the radar toolbox.\n"}, {"file": "radar_toolbox\raw_data\adc.md", "doc": "#main.py
Documentation\n\n## Overview\nradar_toolbox/raw_data/adc.py
is a module within theadc.py
responsible for interfacing with the DCA1000 EVM board via ethernet. This module provides a software interface to control the Analog-to-Digital Converter (ADC) on the DCA1000, enabling the capture of raw radar data. The file includes a classradar_toolbox
that encapsulates the functionality to configure the FPGA, initiate and stop data recording, and read raw data packets from the FPGA.\n\n## Classes\n\n###DCA1000
(Enum)\nAn enumeration of command codes used for communication with the DCA1000 EVM board. The commands include resetting the FPGA, starting/stopping recording, configuring the FPGA, and more.\n\n###CMD
\nThis class represents the software interface to the DCA1000 EVM board. It provides methods to configure the FPGA, start/stop recording, and read raw data packets from the FPGA.\n\n#### Methods\n\n-DCA1000
: Initializes the DCA1000 instance with specified parameters, including radar configuration, IP addresses, and ports.\n\n-__init__(self, radar_config, static_ip='192.168.33.30', adc_ip='192.168.33.180', data_port=4098, config_port=4096, data_port_timeout: float = 1)
: Initializes and connects to the FPGA, configuring various settings.\n\n-configure(self)
: Initiates the recording process on the DCA1000.\n\n-start_record(self)
: Stops the recording process on the DCA1000.\n\n-stop_record(self)
: Closes the sockets used for receiving and sending data.\n\n-close(self)
: Sends a single command to the FPGA and receives the response.\n\n-_send_command(self, cmd, length='0000', body='', timeout=1)
: Helper function to continuously read incoming packets from the DCA1000 in a separate thread.\n\n-_read_incoming_packet(self)
: Retrieves a packet from the packets queue.\n\n-get_packet(self)
: Helper function to read in a single ADC packet via UDP.\n\n###_read_data_packet(self)
\nA subclass ofLiveDCA1000
that includes additional functionality for real-time data processing. It introduces theDCA1000
method, which yields ordered frames when complete orlisten_to_frames
if the current frame times out.\n\n#### Methods\n\n-None
: Initializes the LiveDCA1000 instance with a frame timeout parameter.\n\n-__init__(self, frame_timeout: float, *args, **kwargs)
: Listens to incoming packets and yields ordered frames when complete. Yieldslisten_to_frames(self)
if the current frame times out.\n\n-None
: Reorganizes raw ADC data into a full frame.\n\n## Data Flow\nThe data flow within_organize(self, raw_frame)
involves configuring the FPGA, initiating and stopping recording, and continuously reading incoming packets. For real-time processing, theadc.py
class listens to incoming packets and yields ordered frames when complete.\n\n## Usage\nDevelopers can utilizeLiveDCA1000
to interface with the DCA1000 EVM board, configure the FPGA, and capture raw radar data. The module serves as a crucial link between the FMCW radar hardware and subsequent stages in the radar toolbox.\n"}, {"file": "radar_toolbox\raw_data\raw_bin_to_numpy.md", "doc": "# radar_toolbox/raw_data/raw_bin_to_numpy.py Documentation\n\n## Overview\nadc.py
is a script within theraw_bin_to_numpy.py
module, specifically in theradar_toolbox
submodule. This script reads a raw binary capture file from the FMCW radar, filters and rearranges it into a NumPy array, and then saves it in the NumPy file format. The script is designed to be executed from the command line, with parameters specified in a configuration file (an example is provided:raw_data
).\n\n## Script Execution\nThe script is executed as a standalone program. Below is an example of running the script with command line arguments:\nconf/raw_bin_to_numpy.json
\n\n## Parameters\n-bash\npython raw_bin_to_numpy.py --conf=conf/raw_bin_to_numpy.json\n
: Path to the configuration file (JSON format) that contains the necessary parameters for script execution.\n\n## Dependencies\nThe script relies on the following modules from the--conf
:\n-radar_toolbox
: Utilized for radar model enumeration, reading configuration files, and handling radar configurations.\n-fmcw_utils
: Utilizes utility functions for configuration file handling and data loading.\n\n## Data Processing Steps\n1. Configuration Loading: Reads the configuration settings from the specified configuration file.\n2. Radar Model Initialization: Identifies and initializes the radar model based on the configuration.\n3. Binary File Preparation: Processes the binary capture file, preparing it in a structured format suitable for analysis. This step includes handling missing packets and creating an intermediate binary file (utils
).\n4. NumPy Conversion: Converts the prepared binary radar data file to a NumPy array, considering missing packets and ensuring only complete frames are included.\n5. Data Cleaning: Identifies and processes corrupted frames, deleting them from the dataset.\n6. NumPy File Saving: Saves the cleaned NumPy array to the specified file path.\n\n## Usage\nDevelopers can use this script to convert raw radar binary data into a NumPy array for further analysis or processing. The script is an integral part of the data preprocessing pipeline in the FMCW radar project.\n\n## Note\n- Developers can customize the configuration file (_preparation.bin
) to adapt the script to specific radar configurations and file paths.\n- The script ensures the integrity of the captured radar data by handling missing packets and corrupted frames during the conversion process.\n- The cleaned and processed data is saved in a NumPy file, providing a convenient format for subsequent data analysis and manipulation."}, {"file": "radar_toolbox\raw_data\record_raw_data.md", "doc": "#conf/raw_bin_to_numpy.json
Documentation\n\n## Overview\nradar_toolbox/raw_data/record_raw_data.py
is a script within therecord_raw_data.py
module, specifically in theradar_toolbox
submodule. This script is responsible for recording raw radar data from the FMCW radar system. It interfaces with the radar system, configures it based on specified parameters, and captures raw data packets. The script is designed to be executed from the command line, and its execution is initiated by running the script with a configuration file as a command line argument.\n\n## Script Execution\nThe script is executed as a standalone program. Below is an example of running the script with command line arguments:\n\nraw_data
\n\n### Parameters\n-bash\npython record_raw_data.py --conf=conf/capture_conf.json\n
: Path to the configuration file (JSON format) that contains the necessary parameters for script execution.\n\n## Dependencies\nThe script relies on the following modules from the--conf
:\n-radar_toolbox
: Utilized for radar model enumeration, reading configuration files, and handling radar configurations.\n-fmcw_utils
: Imports theraw_data.adc
class for configuring and interacting with the ADC (Analog-to-Digital Converter).\n-DCA1000
: Utilizes utility functions for configuration file handling and data loading.\n\n## Functionality\nThe script performs the following key functionalities:\n\n1. Argument Parsing: Parses command line arguments, specifically the configuration file path.\n2. Configuration Loading: Loads configuration settings from the specified file.\n3. Radar Model Initialization: Identifies and initializes the radar model based on the configuration.\n4. Data Reader Initialization: Initializes the data reader based on the radar model and connects to the radar system.\n5. Radar Configuration: Reads the radar model and configuration from the specified configuration file.\n6. Recording Setup: Configures the radar system for recording, including sending the configuration file.\n7. Data Capture: Initiates the recording process, capturing raw data packets and saving them to the specified file.\n\n### Interrupt Handling\nThe script is designed to handle keyboard interrupts (e.g., user pressing Ctrl+C) gracefully. It ensures that the recording is stopped, and the data reader is disconnected before exiting.\n\n## Usage\nTo useutils
, run the script with the necessary command line arguments, such as the path to the configuration file. For example:\n\nrecord_raw_data.py
\n\nThis script is essential for initiating and managing radar data recording sessions. Developers can customize recording settings, handle configuration files, and interact with the radar system through this module.\n\n## Data Flow\nThe data flow withinbash\npython record_raw_data.py --conf=conf/capture_conf.json\n
involves configuring the radar system, initiating recording, capturing raw data packets, and saving them to a file. It serves as a critical link between the hardware (FMCW radar) and the subsequent processing stages within the radar toolbox."}, {"file": "radar_toolbox\TI_data\record_ti_data.md", "doc": "# radar_toolbox/TI_data/record_ti_data.py Documentation\n\n## Overview\nrecord_raw_data.py
is a script within therecord_ti_data.py
module, specifically in theradar_toolbox
submodule. This script is designed to read data in real-time from the mmWave radar board. It interfaces with the radar system, configures it based on specified parameters, and captures raw data points. The script allows for saving the captured data to a CSV file for further analysis.\n\n## Script Execution\nThe script is executed as a standalone program. Below is an example of running the script with command line arguments:\n\nTI_data
\n\n### Parameters\n-bash\npython record_ti_data.py --conf=conf/capture_conf.json\n
: Path to the configuration file (JSON format) that contains the necessary parameters for script execution.\n\n## Dependencies\nThe script relies on the following modules from the--conf
:\n-radar_toolbox
: Utilized for radar model enumeration, reading configuration files, and handling radar configurations.\n-fmcw_utils
: Utilizes utility functions for configuration file handling and data loading.\n\n## Functionality\nThe script performs the following key functionalities:\n\n1. Argument Parsing: Parses command line arguments, specifically the configuration file path.\n2. Configuration Loading: Loads configuration settings from the specified file.\n3. Radar Model Initialization: Identifies and initializes the radar model based on the configuration.\n4. Data Reader Initialization: Initializes the data reader based on the radar model and connects to the radar system.\n5. Radar Configuration: Reads the radar model and configuration from the specified configuration file.\n6. Data Capture: Initiates the recording process, capturing raw data points and storing them in memory.\n7. Interrupt Handling: The script is designed to handle keyboard interrupts (e.g., user pressing Ctrl+C) gracefully. It ensures that the data reader is disconnected before exiting.\n\n## CSV Data Saving\nThe script includes a functionutils
that saves the captured radar data to a CSV file. The CSV file contains columns for timestamp (save_capture_to_csv
), and Cartesian coordinates (t
,x
,y
). If velocity information is available, the file also includes a column for velocity (z
).\n\n## Usage\nTo usev
, run the script with the necessary command line arguments, such as the path to the configuration file. For example:\n\nrecord_ti_data.py
\n\nThis script is essential for capturing real-time radar data from the mmWave radar board. Developers can customize recording settings, handle configuration files, and interact with the radar system through this module.\n\n## Data Flow\nThe data flow withinbash\npython record_ti_data.py --conf=conf/capture_conf.json\n
involves configuring the radar system, initiating real-time data capture, and storing the captured points in memory. The script serves as a crucial link between the FMCW radar hardware and subsequent stages in the radar toolbox."}], "other_modules_doc": null, "gen_doc_of_file": {"path": "radar_toolbox\TI_data\sensor_com.py", "content": "import struct\nimport time\nfrom abc import ABC, abstractmethod\nfrom queue import Queue\nfrom threading import Thread\n\nimport numpy as np\nimport serial\n\nfrom radar_toolbox.TI_data.exceptions import BufferOverFlowError, DataPortNotOpenError, GeneralMmWError\nfrom radar_toolbox.fmcw_utils import AntennaConfig\n\n\nclass Reader(ABC):\n MAX_BUFFER_SIZE = 2 ** 15\n DATA_CHUNK_SIZE = 32\n\n def init(self, u_port=None, d_port=None):\n self.user_port = u_port\n self.data_port = d_port\n\n self.data_queue = Queue(maxsize=1_000_000)\n self.stop_reading_flag = True\n self.reader_thread = Thread(target=self._read_incoming_data)\n\n def connect(self, cli_port: str, data_port: str):\n """\n Function to configure the serial ports and send the data from the configuration file to the radar.\n\n :param cfg_file_path: path of the .cfg file to use for radar configuration.\n :param cli_port: port where cli commands will be send.\n :param data_port: port where data will be received.\n """\n # Open the serial ports for the configuration and the data ports\n\n self.user_port = serial.Serial(cli_port, 115200)\n self.data_port = serial.Serial(data_port, 921600)\n\n def send_config(self, cfg_file_path: str):\n """\n Function to send the data from the configuration file to the radar.\n\n :param cfg_file_path: path of the .cfg file to use for radar configuration.\n """\n # Read the configuration file and send it to the board\n config = [line.rstrip('\r\n') for line in open(cfg_file_path)]\n for line in config:\n self.user_port.write((line + '\n').encode())\n print(line)\n time.sleep(0.01)\n\n def start_sensor(self, listen_to_data: bool = False):\n if listen_to_data:\n self.stop_reading_flag = False\n self.reader_thread.start()\n self.user_port.write(('sensorStart\n').encode())\n time.sleep(0.01)\n print(self.user_port.read(self.user_port.in_waiting))\n print("Sensor Start")\n\n def stop_sensor(self):\n self.stop_reading_flag = True\n self.user_port.write(('sensorStop\n').encode())\n time.sleep(0.01)\n print("Sensor Stop")\n\n def disconnect(self):\n self.stop_sensor()\n self.user_port.close()\n self.data_port.close()\n print("Sensor disconnected.")\n\n def _read_incoming_data(self):\n while True:\n if self.stop_reading_flag:\n break\n if self.data_queue.full():\n self.data_queue.get()\n self.data_queue.put(self.data_port.read(self.DATA_CHUNK_SIZE))\n\n @abstractmethod\n def parse_stream(self, only_points: bool = True):\n pass\n\n\nclass Reader16XX(Reader):\n MAGIC_WORD = [2, 1, 4, 3, 6, 5, 8, 7]\n OBJ_STRUCT_SIZE_BYTES = 12\n MMWDEMO_UART_MSG_DETECTED_POINTS = 1\n MMWDEMO_UART_MSG_RANGE_PROFILE = 2\n\n def init(self, antenna_config: AntennaConfig, *args, **kwargs):\n super().init(*args, **kwargs)\n self.antenna_config = antenna_config\n # The maximum relative speed that can be measured by two chirps spaced Tc apart\n doppler_velocities = (antenna_config.center_wavelength / 2) * np.fft.fftfreq(antenna_config.nb_loops,\n d=antenna_config.loop_duration)\n # Constants\n self.dopplerResolutionMps = doppler_velocities[1] - doppler_velocities[0]\n self.rangeIdxToMeters = 299792458 / (2 * antenna_config.B) # Range resolution\n\n self.byte_buffer = np.zeros(Reader.MAX_BUFFER_SIZE, dtype='uint8')\n self.byte_buffer_length = 0\n\n def parse_stream(self, only_points: bool = True):\n """\n Reads and parses the incoming data.\n\n :param only_points: Should the returned data contain only (x, y, z) points or also all other informations\n (range, doppler and peaks) ? If False, only number of objects, x, y and z will be returned (process time\n saved).\n :return: dictionary of received elements\n """\n\n # Initialize variables\n magicOK = 0 # Checks if magic number has been read\n result = {"data_ok": False} # Checks if the data has been read correctly\n\n byteVec = np.frombuffer(self.data_queue.get(), dtype='uint8')\n byteCount = len(byteVec)\n\n # Check that the buffer is not full, and then add the data to the buffer\n if (self.byte_buffer_length + byteCount) < Reader.MAX_BUFFER_SIZE:\n self.byte_buffer[self.byte_buffer_length:self.byte_buffer_length + byteCount] = byteVec[:byteCount]\n self.byte_buffer_length += byteCount\n\n # Check that the buffer has some data\n if self.byte_buffer_length > 16:\n\n # Check for all possible locations of the magic word\n possibleLocs = np.where(self.byte_buffer == self.MAGIC_WORD[0])[0]\n\n # Confirm that is the beginning of the magic word and store the index in startIdx\n startIdx = []\n for loc in possibleLocs:\n check = self.byte_buffer[loc:loc + 8]\n if np.all(check == self.MAGIC_WORD):\n startIdx.append(loc)\n\n # Check that startIdx is not empty\n if startIdx:\n # Remove the data before the first start index\n if 0 < startIdx[0] < self.byte_buffer_length:\n self.byte_buffer[:self.byte_buffer_length - startIdx[0]] = self.byte_buffer[\n startIdx[0]:self.byte_buffer_length]\n self.byte_buffer[self.byte_buffer_length - startIdx[0]:] = np.zeros(\n len(self.byte_buffer[self.byte_buffer_length - startIdx[0]:]), dtype='uint8')\n self.byte_buffer_length = self.byte_buffer_length - startIdx[0]\n\n # Check that there have no errors with the byte buffer length\n if self.byte_buffer_length < 0:\n self.byte_buffer_length = 0\n\n # Read the total packet length\n totalPacketLen = int.from_bytes(self.byte_buffer[12:12 + 4], "little")\n # TODO utiliser conversion en little endian, plus efficace.\n\n # Check that all the packet has been read\n if (self.byte_buffer_length >= totalPacketLen) and (self.byte_buffer_length != 0):\n magicOK = 1\n\n # If magicOK is equal to 1 then process the message\n if magicOK:\n # Initialize the pointer index\n idX = 0\n\n # Read the header\n magicNumber = self.byte_buffer[idX:idX + 8]\n idX += 8\n version = format(int.from_bytes(self.byte_buffer[idX:idX + 4], "little"), 'x')\n idX += 4\n totalPacketLen = int.from_bytes(self.byte_buffer[idX:idX + 4], "little")\n idX += 4\n platform = format(int.from_bytes(self.byte_buffer[idX:idX + 4], "little"), 'x')\n idX += 4\n result["frame_num"] = int.from_bytes(self.byte_buffer[idX:idX + 4], "little")\n idX += 4\n timeCpuCycles = int.from_bytes(self.byte_buffer[idX:idX + 4], "little", signed=True)\n idX += 4\n numDetectedObj = int.from_bytes(self.byte_buffer[idX:idX + 4], "little")\n idX += 4\n numTLVs = int.from_bytes(self.byte_buffer[idX:idX + 4], "little")\n idX += 4\n subFrameNumber = int.from_bytes(self.byte_buffer[idX:idX + 4], "little")\n idX += 4\n\n # Read the TLV messages\n for tlvIdx in range(numTLVs):\n # Check the header of the TLV message\n tlv_type = int.from_bytes(self.byte_buffer[idX:idX + 4], "little")\n idX += 4\n tlv_length = int.from_bytes(self.byte_buffer[idX:idX + 4], "little")\n idX += 4\n\n # Read the data depending on the TLV message\n if tlv_type == self.MMWDEMO_UART_MSG_DETECTED_POINTS:\n tlv_numObj = int.from_bytes(self.byte_buffer[idX:idX + 2], "little")\n idX += 2\n tlv_xyzQFormat = 2 ** int.from_bytes(self.byte_buffer[idX:idX + 2], "little")\n idX += 2\n\n # Initialize the arrays\n if not only_points:\n rangeIdx = np.zeros(tlv_numObj, dtype='int16')\n dopplerIdx = np.zeros(tlv_numObj, dtype='int16')\n peakVal = np.zeros(tlv_numObj, dtype='int16')\n detected_points = np.zeros((tlv_numObj, 3), dtype='int16')\n\n for objectNum in range(tlv_numObj):\n # Read the data for each object\n if not only_points:\n rangeIdx[objectNum] = int.from_bytes(self.byte_buffer[idX:idX + 2], "little")\n idX += 2\n dopplerIdx[objectNum] = int.from_bytes(self.byte_buffer[idX:idX + 2], "little")\n idX += 2\n peakVal[objectNum] = int.from_bytes(self.byte_buffer[idX:idX + 2], "little")\n idX += 2\n else:\n idX += 6\n detected_points[objectNum, 0] = int.from_bytes(self.byte_buffer[idX:idX + 2], "little",\n signed=True)\n idX += 2\n detected_points[objectNum, 1] = int.from_bytes(self.byte_buffer[idX:idX + 2], "little",\n signed=True)\n idX += 2\n detected_points[objectNum, 2] = int.from_bytes(self.byte_buffer[idX:idX + 2], "little",\n signed=True)\n idX += 2\n\n detected_points = detected_points / tlv_xyzQFormat\n result["detected_points"] = detected_points\n\n # Store the data in the result dictionary\n if not only_points:\n # Make the necessary corrections and calculate the rest of the data\n range_val = rangeIdx * self.rangeIdxToMeters\n dopplerIdx[dopplerIdx > (self.antenna_config.nb_loops / 2 - 1)] = dopplerIdx[dopplerIdx > (\n self.antenna_config.nb_loops / 2 - 1)] - 65535\n doppler_val = dopplerIdx * self.dopplerResolutionMps\n result["range_idx"] = rangeIdx\n result["range_profile"] = range_val\n result["doppler_idx"] = dopplerIdx\n result["doppler_profile"] = doppler_val\n result["peak_values"] = peakVal\n result["data_ok"] = True\n\n # Remove already processed data\n if idX > 0 and self.byte_buffer_length > idX:\n shiftSize = totalPacketLen\n\n self.byte_buffer[:self.byte_buffer_length - shiftSize] = self.byte_buffer[\n shiftSize:self.byte_buffer_length]\n self.byte_buffer[self.byte_buffer_length - shiftSize:] = np.zeros(\n len(self.byte_buffer[self.byte_buffer_length - shiftSize:]),\n dtype='uint8')\n self.byte_buffer_length = self.byte_buffer_length - shiftSize\n\n # Check that there are no errors with the buffer length\n if self.byte_buffer_length < 0:\n self.byte_buffer_length = 0\n\n return result\n\n\nclass Reader68XX(Reader):\n\n def init(self, *args, **kwargs):\n super().init(*args, **kwargs)\n self.byte_buffer = b''\n\n def _tlvHeaderDecode(self, data):\n tlvType, tlvLength = struct.unpack('2I', data)\n return tlvType, tlvLength\n\n def _parseDetectedObjects(self, data, numObj, tlvLength):\n detected_points = struct.unpack(str(numObj * 4) + 'f', data[:tlvLength])\n return np.asarray(detected_points).reshape(numObj, 4)\n\n def _parseDetectedTarget(self, data):\n tid, posX, posY, posZ, velX, velY, velZ, accX, accY, accZ = struct.unpack('I9f', data[0:40])\n # ec = struct.unpack('16f', data[40:104])\n # g, cl = struct.unpack('2f', data[104:112])\n return posX, posY, posZ\n\n def _parseDetectedPointsCompressed(self, data, numObj, tlvLength):\n xyzUnit, dopplerUnit, snrUnit, noiseUnit, numDetectedPoints1, numDetectedPoints2 = struct.unpack('4f2H',\n data[:20])\n data = data[20:]\n detected_points = np.empty((numObj, 4), dtype=float)\n for i in range(numObj):\n detected_points[i] = struct.unpack('4H', data[:8])\n # doppler, snr, noise = struct.unpack('2B', data[8:10])\n data = data[10:]\n detected_points[:, :3] *= xyzUnit\n detected_points[:, 3] *= dopplerUnit\n return detected_points\n\n def _parseRangeProfile(self, data, tlvLength):\n # an integer is 2 byte long\n range_bins = tlvLength / 2\n range_profile = struct.unpack(str(int(range_bins)) + 'H', data[:tlvLength])\n return range_profile\n\n def _parseRDheatmap(self, data, tlvLength, range_bins, rm_clutter=True):\n """\n range bins times doppler bins times 2, doppler bins = chirps/ frame divided by num of antennas TX (3)\n #default chirps per frame is (128/3) = 42 * 2 * 256\n\n the call to replace_left_right mirror-flips left and right after reshaping.\n replace_left_right is equivalent to this line from mmWave.js in the visualizer code\n # rangeDoppler = rangeDoppler.slice((rangeDoppler.length + 1) / 2).concat(\n # rangeDoppler.slice(0, (rangeDoppler.length + 1) / 2));\n\n :param range_bins:\n :param data: the incoming byte stream to be interpreted as range-doppler heatmap/profile\n :param tlvLength:\n :return:\n """\n doppler_bins = (tlvLength / 2) / range_bins\n rd_heatmap = struct.unpack(str(int(range_bins * doppler_bins)) + 'H', data[:tlvLength])\n rd_heatmap = np.reshape(rd_heatmap, (int(range_bins), int(doppler_bins)))\n\n overall_mean = np.mean(rd_heatmap)\n if rm_clutter:\n rd_heatmap = np.array([row - np.mean(row) for row in rd_heatmap])\n\n return self._replace_left_right(rd_heatmap)\n\n def _chg_val(self, val):\n return val - 65536 if val > 32767 else val\n\n def _parseAziheatmap(self, data, tlvLength, range_bins):\n """\n :param range_bins:\n :param data: the incoming byte stream to be interpreted as range-doppler heatmap/profile\n :param tlvLength:\n :return:\n """\n # range_bins = 256\n azi_bins = (tlvLength / 2) / range_bins\n azi_heatmap = struct.unpack(str(int(range_bins * azi_bins)) + 'H', data[:tlvLength])\n # azi_heatmap = [self._chg_val(x) for x in azi_heatmap]\n azi_heatmap = np.reshape(azi_heatmap, (int(range_bins), int(azi_bins)))\n\n # use the default order of 3 Tx's and ordering is TX0, TX1, TX2\n row_indices = [7, 5, 11, 9]\n qrows = 4\n qcols = range_bins\n rowSizeBytes = 48\n q = data[:tlvLength]\n qq = []\n for col in range(qcols):\n real = []\n img = []\n for row in range(qrows):\n index = col * rowSizeBytes + 4 * row_indices[row]\n real.append(q[index + 1] * 256 + q[index])\n img.append(q[index + 3] * 256 + q[index + 2])\n real = [self._chg_val(x) for x in real]\n img = [self._chg_val(x) for x in img]\n # convert to complex numbers\n data = np.array([real, img]).transpose()\n data = np.pad(data, ((0, 60), (0, 0)), 'constant', constant_values=0)\n data = data[..., 0] + 1j * data[..., 1]\n transformed = np.fft.fft(data)\n\n # take the magnitude\n transformed = np.absolute(transformed)\n qq.append(\n np.concatenate((transformed[int(len(transformed) / 2):], transformed[:int(len(transformed) / 2)])))\n qq = np.array(qq)\n return qq\n\n def _replace_left_right(self, a):\n rtn = np.empty(shape=a.shape)\n rtn[:, :int(rtn.shape[1] / 2)] = a[:, int(rtn.shape[1] / 2):]\n rtn[:, int(rtn.shape[1] / 2):] = a[:, :int(rtn.shape[1] / 2)]\n return rtn\n\n def _parseStats(self, data):\n interProcess, transmitOut, frameMargin, chirpMargin, activeCPULoad, interCPULoad = struct.unpack('6I',\n data[:24])\n return interProcess, transmitOut, frameMargin, chirpMargin, activeCPULoad, interCPULoad\n\n def _decode_iwr_tlv(self, in_data):\n """\n Must disable range profile for the quick RD heatmap to work, this way the number of range bins will be calculated\n from the absent range profile. You can still get the range profile by inferring it from the RD heatmap\n """\n magic = b'\x02\x01\x04\x03\x06\x05\x08\x07'\n header_length = 36\n result = {"data_ok": False}\n\n offset = in_data.find(magic)\n data = in_data[offset:]\n if len(data) < header_length:\n return result\n try:\n data_magic, version, length, platform, result["frame_num"], cpuCycles, numObj, numTLVs = struct.unpack(\n 'Q7I', data[:header_length])\n except struct.error:\n print("Improper TLV structure found: ", (data,))\n return result\n # print("Packet ID:\t%d "%(frameNum))\n # print("Version:\t%x "%(version))\n # print("Data Len:\t\t%d", length)\n # print("TLV:\t\t%d "%(numTLVs))\n # print("Detect Obj:\t%d "%(numObj))\n # print("Platform:\t%X "%(platform))\n if version >= 50462726 and len(data) >= length:\n # if version > 0x01000005 and len(data) >= length:\n try:\n sub_frame_num = struct.unpack('I', data[36:40])[0]\n header_length = 40\n pending_bytes = length - header_length\n data = data[header_length:]\n\n result["detected_points"] = None\n result["detected_targets"] = []\n result["range_profile"] = None\n result["rd_heatmap"] = None\n result["azi_heatmap"] = None\n range_bins = 8\n statistics = None\n\n for i in range(numTLVs):\n tlvType, tlvLength = self._tlvHeaderDecode(data[:8])\n data = data[8:]\n if tlvType == 1:\n result["detected_points"] = self._parseDetectedObjects(data, numObj,\n tlvLength) # if no detected points, tlvType won't have 1\n elif tlvType == 301:\n result["detected_points"] = self._parseDetectedPointsCompressed(data, numObj, tlvLength)\n elif tlvType == 308:\n posX, posY, posZ = self._parseDetectedTarget(data)\n result["detected_targets"].append([posX, posY, posZ])\n elif tlvType == 2:\n # the range bins is modified if the range profile is enabled\n result["range_profile"] = self._parseRangeProfile(data, tlvLength)\n\n elif tlvType == 4:\n # resolving static azimuth heatmap\n pass\n elif tlvType == 5:\n # try:\n # assert range_bins\n # except AssertionError:\n # raise Exception('Must enable range-profile while enabling range-doppler-profile, in order to'\n # 'interpret the number of range bins')\n result["rd_heatmap"] = self._parseRDheatmap(data, tlvLength, range_bins)\n elif tlvType == 6:\n interProcess, transmitOut, frameMargin, chirpMargin, activeCPULoad, interCPULoad = self._parseStats(\n data)\n pass\n elif tlvType == 7:\n pass\n elif tlvType == 8:\n # resolving static azimuth-elevation heatmap\n try:\n result["azi_heatmap"] = self._parseAziheatmap(data, tlvLength, range_bins)\n except:\n print('bad azimuth')\n result["azi_heatmap"] = None\n pass\n elif tlvType == 9: # only for AoP EV2\n pass\n else:\n # print("Unidentified tlv type %d" % tlvType, '. Its len is ' + str(tlvLength))\n n_offset = data.find(magic)\n if n_offset != offset and n_offset != -1:\n print(f'New magic found, discarding previous frame with unknown tlv {tlvType}')\n result["leftover_data"] = data[n_offset:]\n result["data_ok"] = True\n return result\n data = data[tlvLength:]\n pending_bytes -= (8 + tlvLength)\n result["leftover_data"] = data[pending_bytes:] # data that are left\n # infer range profile from heatmap is the former is not enabled\n if result["range_profile"] is None and result["rd_heatmap"] is not None and len(\n result["rd_heatmap"]) > 0:\n result["range_profile"] = result["rd_heatmap"][:, 0]\n result["data_ok"] = True\n return result\n except struct.error as se:\n print('Failed to parse tlv message, type = ' + str(tlvType) + ', error: ')\n print(se)\n pass\n\n return result\n\n def parse_stream(self, only_points: bool = True): # TODO : handle only_points parameter\n try:\n self.byte_buffer += self.data_queue.get()\n\n if len(self.byte_buffer) > self.MAX_BUFFER_SIZE:\n raise BufferOverFlowError\n\n result = self._decode_iwr_tlv(self.byte_buffer)\n\n if result["data_ok"]:\n self.byte_buffer = b'' + result["leftover_data"]\n return result\n except (serial.serialutil.SerialException, AttributeError, TypeError, ValueError) as e:\n if type(e) == serial.serialutil.SerialException:\n raise DataPortNotOpenError\n else:\n raise GeneralMmWError\n\n\nclass Reader64XX(Reader68XX):\n\n def connect(self, cli_port: str, data_port: str):\n """\n Function to configure the serial ports and send the data from the configuration file to the radar.\n\n :param cfg_file_path: path of the .cfg file to use for radar configuration.\n :param cli_port: port where cli commands will be send.\n :param data_port: port where data will be received.\n """\n # Open the serial ports for the configuration and the data ports\n\n self.user_port = serial.Serial(cli_port, 115200)\n self.data_port = self.user_port\nrecord_ti_data.py - Instructions
- The project focuses on the use of FMCW radar
- It is used for data collection, point detection, tracking and other functions
- The project is broken down into several modules
For each step, include: action, checks, and failure rollback/fallback plan. Output format: for each step number, provide status/result and what to do next.
Triggers
- Use when the user asks for a process or checklist.
- Use when you want to reuse a previously mentioned method/SOP.
Examples
Example 1
Input:
Break this into best-practice, executable steps.