Loading...
Searching...
No Matches
writeinCSV.py

This is an example of the writeinCSV.py file, which helps control Squidstat in parallel with other devices.

This is an example of the writeinCSV.py file, which helps control Squidstat in parallel with other devices.In this example, we inform other devices using SerialPortReader::writeData when a new element starts executing inside Squidstat, and also print the data received from the other device using the SerialPortReader::dataReceived signal. All operations occur in parallel with the Squidstat operation.

The Squidstat data is also written to a CSV file.

In detail:

  1. The SerialPortReader class handles reading from and writing to a serial port of other device.
  2. The WriteCSV class is responsible for writing data to a CSV file which is received from Squidstat.
  3. The writingThread class is used to manage the data writing in csv file process in a separate thread.
1"""! @example writeinCSV.py
2This is an example of the writeinCSV.py file, which helps control Squidstat in parallel with other devices.
3In this example, we inform other devices using SerialPortReader::writeData when a new element starts executing
4inside Squidstat, and also print the data received from the other device using the SerialPortReader::dataReceived signal.
5All operations occur in parallel with the Squidstat operation.
6
7The Squidstat data is also written to a CSV file.
8
9In detail:
101. The `SerialPortReader` class handles reading from and writing to a serial port of other device.
112. The `WriteCSV` class is responsible for writing data to a CSV file which is received from Squidstat.
123. The `writingThread` class is used to manage the data writing in csv file process in a separate thread.
13"""
14
15import sys
16from PySide6.QtCore import QIODevice, QThread, QObject, Signal
17from PySide6.QtSerialPort import QSerialPort
18from PySide6.QtWidgets import QApplication
19from SquidstatPyLibrary import AisDeviceTracker
20from SquidstatPyLibrary import AisErrorCode
21from SquidstatPyLibrary import AisExperiment
22from SquidstatPyLibrary import AisConstantCurrentElement
23from SquidstatPyLibrary import AisOpenCircuitElement
24
25# initialize the application
26app = QApplication([])
27
28# convert incoming data to string with single line
29def convert_to_csv_line(data_list):
30 return ','.join(str(item) for item in data_list)
31
32# \cond EXCLUDE_FROM_DOX
33# class for reading and writing data from serial port of other device.
34class SerialPortReader(QObject):
35 # define the Qt signal.
36 dataReceived = Signal(str)
37
38 # initialize self and port
39 def __init__(self, port):
40 super().__init__()
41 self.port = port
42
43 # open port if closed
44 # get data and decode
45 # emit data
46 def run(self):
47 if not self.port.isOpen():
48 self.port.open(QIODevice.ReadOnly)
49 while self.port.isOpen():
50 if self.port.waitForReadyRead():
51 data = self.port.readAll().data().decode()
52 self.dataReceived.emit(data)
53
54 # open port if closed
55 # write data and encode
56 def writeData(self, data):
57 if not self.port.isOpen():
58 successfullyOpen = self.port.open(QIODevice.WriteOnly)
59 if not successfullyOpen:
60 print("USB port is not open.")
61 self.port.write(data.encode())
62
63 # check if port is open, close port if open
64 def closePort(self):
65 if self.port.isOpen():
66 self.port.close()
67
68
69# class for writing data to a csv file. data received from Squidstat.
70class WriteCSV:
71 # init filename and file
72 def __init__(self, filename):
73 self.filename = filename
74 self.file = None
75
76 # open file and write headers
77 def write_header(self, header):
78 if self.file is None:
79 self.file = open(self.filename, 'w')
80 self.file.write(convert_to_csv_line(header) + '\n')
81
82 # write data to file
83 def write_data(self, data):
84 if self.file is not None:
85 self.file.write(convert_to_csv_line(data) + '\n')
86
87 # close file when we are done
88 def close(self):
89 if self.file is not None:
90 self.file.close()
91
92
93# class to handle the write funcationality on seprate thread in pareller operation of Squidstat and other device.
94class writingThread(QThread):
95 # define the signal.
96 writeData = Signal(float, float)
97 stopTowrite = Signal()
98
99 # init self with values to be written
100 def __init__(self, csv_writer):
101 super().__init__()
102 self.timestamps = []
103 self.voltages = []
104 self.csv_writer = csv_writer
105
106 # setup data file with headers and connect the call back function on emitting of Qt signal.
107 def run(self):
108 self.csv_writer.write_header(['Timestamp', 'Working Electrode Voltage'])
109 self.writeData.connect(self.add_data)
110 self.stopTowrite.connect(self.close)
111
112 # add data into list as well as call back handler you can use to write the data in csv file.
113 def add_data(self, timestamp, voltage):
114 self.timestamps.append(timestamp)
115 self.voltages.append(voltage)
116 self.csv_writer.write_data([timestamp, voltage])
117
118 # close writer for a channel
119 def close(self):
120 self.csv_writer.close()
121# \endcond
122
123# setup serial port of other device.
124serialPort = QSerialPort("COM3")
125# setup baud rate of other device.
126serialPort.setBaudRate(QSerialPort.Baud9600)
127# set up communication data type of other device.
128serialPort.setDataBits(QSerialPort.Data8)
129# define a serial port reader thread.
130serialPortReader = SerialPortReader(serialPort)
131
132
133# function to write data from the serial port of other device.
134def writeDataToPort(data):
135 serialPortReader.writeData(data)
136
137# instantiate a Squidstat device tracker
139
140# interact with data and send experiments to Squidstat device
141def onNewDeviceConnected(deviceName):
142 # print which device has been connected
143 print(f"Connected to: {deviceName}")
144 # get handler using device name.
145 handler = tracker.getInstrumentHandler(deviceName)
146 # if handler is present for the particular device then we can interact with the data and upload/start/stop/puase/resume experiments
147 if handler:
148 # setup file name
149 csv_writer = WriteCSV('dataFile.csv')
150 # add csv file to writing thread.
151 writingThread = writingThread(csv_writer)
152 # start sub thread for write funcationality.
153 writingThread.start()
154
155 # manages DC data input and output
156 # add more variables if you want to print more data to the console
157 # send the signal to writing thread to write the information in csv file.
158 handler.activeDCDataReady.connect(lambda channel, data: (
159 print("timestamp:", "{:.9f}".format(data.timestamp), "workingElectrodeVoltage: ",
160 "{:.9f}".format(data.workingElectrodeVoltage)),
161 writingThread.writeData.emit(data.timestamp, data.workingElectrodeVoltage)
162 ))
163
164 # manages AC data input and output
165 # add more variables if you want to print more data to the console
166 handler.activeACDataReady.connect(lambda channel, data: print("frequency:", "{:.9f}".format(data.frequency),
167 "absoluteImpedance: ", "{:.9f}".format(
168 data.absoluteImpedance), "phaseAngle: ",
169 "{:.9f}".format(data.phaseAngle)))
170 # write when new node is starting
171 handler.experimentNewElementStarting.connect(lambda channel, data: writeDataToPort(data.stepName))
172 # print when experiment has stopped, stop writeting thread
173 # send the signal to writing thread experiment is completed, which will close the csv file.
174 handler.experimentStopped.connect(lambda channel: (print(f"Experiment completed on channel {channel}"), writingThread.stopTowrite.emit(), app.quit()))
175
176 # initialize an experiment
177 experiment = AisExperiment()
178
179 # define a constant current experiment at 0.1 A, with 1 s sampling time, and a duration of 10 s
180 ccElement = AisConstantCurrentElement(0.1, 1, 10)
181 # define an open circuit experiment with a duration of 10 s and a sampling time of 2 s
182 opencircuitElement = AisOpenCircuitElement(10, 2)
183
184 # add constant current as the first element in the list
185 # element runs 1 time
186 successfullyadd = experiment.appendElement(ccElement, 1)
187 # add open circuit as the second and thirds elements in the list
188 # element runs 2 times
189 successfullyadd |= experiment.appendElement(opencircuitElement, 2)
190
191 if not successfullyadd:
192 print("Error adding element to experiment")
193 app.quit()
194
195 # upload experiment to channel 1
196 error = handler.uploadExperimentToChannel(0, experiment)
197 if error.value() != AisErrorCode.Success:
198 print(error.message())
199 app.quit()
200
201 # start experiment on channel 1
202 error = handler.startUploadedExperiment(0)
203 if error.value() != AisErrorCode.Success:
204 print(error.message())
205 app.quit()
206
207# connect to device associated with the tracker
208tracker.newDeviceConnected.connect(onNewDeviceConnected)
209
210# Request the device to connect using com port 4
211error = tracker.connectToDeviceOnComPort("COM4")
212if error:
213 print(error.message())
214 sys.exit()
215
216# print the data which is received from another device (other then Squidstat)
217serialPortReader.dataReceived.connect(lambda data: print("Received data from COM port 3:", data))
218# setup a sub thread for read and write the information from the other device.
219serialPortThread = QThread()
220# pushes object to another thread
221serialPortReader.moveToThread(serialPortThread)
222# start the sub thread of other device.
223serialPortThread.start()
224# exit program
225sys.exit(app.exec())
an experiment that simulates a constant current flow with more advance options for stopping the exper...
Definition AisConstantCurrentElement.h:18
static AisDeviceTracker * Instance()
get the instance of the device tracker.
this class is used to create custom experiments. A custom experiment contains one or more elements....
Definition AisExperiment.h:22
This experiment observes the open circuit potential of the working electrode for a specific period of...
Definition AisOpenCircuitElement.h:18