Loading...
Searching...
No Matches
writeinCSV.py
1"""! \example writeinCSV.py """
2
3"""
4This is an example of the writeinCSV.py file, which helps control Squidstat in parallel with other devices.
5In this example, we inform other devices using SerialPortReader::writeData when a new element starts executing
6inside Squidstat, and also print the data received from the other device using the SerialPortReader::dataReceived signal.
7All operations occur in parallel with the Squidstat operation.
8
9The Squidstat data is also written to a CSV file.
10
11In detail:
121. The `SerialPortReader` class handles reading from and writing to a serial port of other device.
132. The `WriteCSV` class is responsible for writing data to a CSV file which is received from Squidstat.
143. The `writingThread` class is used to manage the data writing in csv file process in a separate thread.
15"""
16
17import sys
18from PySide6.QtCore import QIODevice, QThread, QObject, Signal
19from PySide6.QtSerialPort import QSerialPort
20from PySide6.QtCore import QCoreApplication
21from SquidstatPyLibrary import AisDeviceTracker
22from SquidstatPyLibrary import AisErrorCode
23from SquidstatPyLibrary import AisExperiment
24from SquidstatPyLibrary import AisConstantCurrentElement
25from SquidstatPyLibrary import AisOpenCircuitElement
26
27# initialize the application
28app = QCoreApplication([])
29
30# convert incoming data to string with single line
31def convert_to_csv_line(data_list):
32 return ','.join(str(item) for item in data_list)
33
34# \cond EXCLUDE_FROM_DOX
35# class for reading and writing data from serial port of other device.
36class SerialPortReader(QObject):
37 # define the Qt signal.
38 dataReceived = Signal(str)
39
40 # initialize self and port
41 def __init__(self, port):
42 super().__init__()
43 self.port = port
44
45 # open port if closed
46 # get data and decode
47 # emit data
48 def run(self):
49 if not self.port.isOpen():
50 self.port.open(QIODevice.ReadOnly)
51 while self.port.isOpen():
52 if self.port.waitForReadyRead():
53 data = self.port.readAll().data().decode()
54 self.dataReceived.emit(data)
55
56 # open port if closed
57 # write data and encode
58 def writeData(self, data):
59 if not self.port.isOpen():
60 successfullyOpen = self.port.open(QIODevice.WriteOnly)
61 if not successfullyOpen:
62 print("USB port is not open.")
63 self.port.write(data.encode())
64
65 # check if port is open, close port if open
66 def closePort(self):
67 if self.port.isOpen():
68 self.port.close()
69
70
71# class for writing data to a csv file. data received from Squidstat.
72class WriteCSV:
73 # init filename and file
74 def __init__(self, filename):
75 self.filename = filename
76 self.file = None
77
78 # open file and write headers
79 def write_header(self, header):
80 if self.file is None:
81 self.file = open(self.filename, 'w')
82 self.file.write(convert_to_csv_line(header) + '\n')
83
84 # write data to file
85 def write_data(self, data):
86 if self.file is not None:
87 self.file.write(convert_to_csv_line(data) + '\n')
88
89 # close file when we are done
90 def close(self):
91 if self.file is not None:
92 self.file.close()
93
94
95# class to handle the write funcationality on seprate thread in pareller operation of Squidstat and other device.
96class writingThread(QThread):
97 # define the signal.
98 writeData = Signal(float, float)
99 stopTowrite = Signal()
100
101 # init self with values to be written
102 def __init__(self, csv_writer):
103 super().__init__()
104 self.timestamps = []
105 self.voltages = []
106 self.csv_writer = csv_writer
107
108 # setup data file with headers and connect the call back function on emitting of Qt signal.
109 def run(self):
110 self.csv_writer.write_header(['Timestamp', 'Working Electrode Voltage'])
111 self.writeData.connect(self.add_data)
112 self.stopTowrite.connect(self.close)
113
114 # add data into list as well as call back handler you can use to write the data in csv file.
115 def add_data(self, timestamp, voltage):
116 self.timestamps.append(timestamp)
117 self.voltages.append(voltage)
118 self.csv_writer.write_data([timestamp, voltage])
119
120 # close writer for a channel
121 def close(self):
122 self.csv_writer.close()
123# \endcond
124
125# setup serial port of other device.
126serialPort = QSerialPort("COM3")
127# setup baud rate of other device.
128serialPort.setBaudRate(QSerialPort.Baud9600)
129# set up communication data type of other device.
130serialPort.setDataBits(QSerialPort.Data8)
131# define a serial port reader thread.
132serialPortReader = SerialPortReader(serialPort)
133
134
135# function to write data from the serial port of other device.
136def writeDataToPort(data):
137 serialPortReader.writeData(data)
138
139# instantiate a Squidstat device tracker
141
142# interact with data and send experiments to Squidstat device
143def onNewDeviceConnected(deviceName):
144 # print which device has been connected
145 print(f"Connected to: {deviceName}")
146 # get handler using device name.
147 handler = tracker.getInstrumentHandler(deviceName)
148 # if handler is present for the particular device then we can interact with the data and upload/start/stop/puase/resume experiments
149 if handler:
150 # setup file name
151 csv_writer = WriteCSV('dataFile.csv')
152 # add csv file to writing thread.
153 writingThread = writingThread(csv_writer)
154 # start sub thread for write funcationality.
155 writingThread.start()
156
157 # manages DC data input and output
158 # add more variables if you want to print more data to the console
159 # send the signal to writing thread to write the information in csv file.
160 handler.activeDCDataReady.connect(lambda channel, data: (
161 print("timestamp:", "{:.9f}".format(data.timestamp), "workingElectrodeVoltage: ",
162 "{:.9f}".format(data.workingElectrodeVoltage)),
163 writingThread.writeData.emit(data.timestamp, data.workingElectrodeVoltage)
164 ))
165
166 # manages AC data input and output
167 # add more variables if you want to print more data to the console
168 handler.activeACDataReady.connect(lambda channel, data: print("frequency:", "{:.9f}".format(data.frequency),
169 "absoluteImpedance: ", "{:.9f}".format(
170 data.absoluteImpedance), "phaseAngle: ",
171 "{:.9f}".format(data.phaseAngle)))
172 # write when new node is starting
173 handler.experimentNewElementStarting.connect(lambda channel, data: writeDataToPort(data.stepName))
174 # print when experiment has stopped, stop writeting thread
175 # send the signal to writing thread experiment is completed, which will close the csv file.
176 handler.experimentStopped.connect(lambda channel: (print(f"Experiment completed on channel {channel}"), writingThread.stopTowrite.emit(), app.quit()))
177
178 # initialize an experiment
179 experiment = AisExperiment()
180
181 # define a constant current experiment at 0.1 A, with 1 s sampling time, and a duration of 10 s
182 ccElement = AisConstantCurrentElement(0.1, 1, 10)
183 # define an open circuit experiment with a duration of 10 s and a sampling time of 2 s
184 opencircuitElement = AisOpenCircuitElement(10, 2)
185
186 # add constant current as the first element in the list
187 # element runs 1 time
188 successfullyadd = experiment.appendElement(ccElement, 1)
189 # add open circuit as the second and thirds elements in the list
190 # element runs 2 times
191 successfullyadd |= experiment.appendElement(opencircuitElement, 2)
192
193 if not successfullyadd:
194 print("Error adding element to experiment")
195 app.quit()
196
197 # upload experiment to channel 1
198 error = handler.uploadExperimentToChannel(0, experiment)
199 if error.value() != AisErrorCode.Success:
200 print(error.message())
201 app.quit()
202
203 # start experiment on channel 1
204 error = handler.startUploadedExperiment(0)
205 if error.value() != AisErrorCode.Success:
206 print(error.message())
207 app.quit()
208
209# connect to device associated with the tracker
210tracker.newDeviceConnected.connect(onNewDeviceConnected)
211
212# Request the device to connect using com port 4
213error = tracker.connectToDeviceOnComPort("COM4")
214if error:
215 print(error.message())
216 sys.exit()
217
218# print the data which is received from another device (other then Squidstat)
219serialPortReader.dataReceived.connect(lambda data: print("Received data from COM port 3:", data))
220# setup a sub thread for read and write the information from the other device.
221serialPortThread = QThread()
222# pushes object to another thread
223serialPortReader.moveToThread(serialPortThread)
224# start the sub thread of other device.
225serialPortThread.start()
226# exit program
227sys.exit(app.exec())
an experiment that simulates a constant current flow with more advance options for stopping the exper...
Definition AisConstantCurrentElement.h:18
static AisDeviceTracker * Instance()
get the instance of the device tracker.
this class is used to create custom experiments. A custom experiment contains one or more elements....
Definition AisExperiment.h:22
This experiment observes the open circuit potential of the working electrode for a specific period of...
Definition AisOpenCircuitElement.h:18