This is an example of the writeinCSV.py file, which helps control Squidstat in parallel with other devices.
This is an example of the writeinCSV.py file, which helps control Squidstat in parallel with other devices.In this example, we inform other devices using SerialPortReader::writeData when a new element starts executing inside Squidstat, and also print the data received from the other device using the SerialPortReader::dataReceived signal. All operations occur in parallel with the Squidstat operation.
The Squidstat data is also written to a CSV file.
1"""! @example writeinCSV.py
2This is an example of the writeinCSV.py file, which helps control Squidstat in parallel with other devices.
3In this example, we inform other devices using SerialPortReader::writeData when a new element starts executing
4inside Squidstat, and also print the data received from the other device using the SerialPortReader::dataReceived signal.
5All operations occur in parallel with the Squidstat operation.
6
7The Squidstat data is also written to a CSV file.
8
9In detail:
101. The `SerialPortReader` class handles reading from and writing to a serial port of other device.
112. The `WriteCSV` class is responsible for writing data to a CSV file which is received from Squidstat.
123. The `writingThread` class is used to manage the data writing in csv file process in a separate thread.
13"""
14
15import sys
16from PySide6.QtCore import QIODevice, QThread, QObject, Signal
17from PySide6.QtSerialPort import QSerialPort
18from PySide6.QtWidgets import QApplication
19from SquidstatPyLibrary import AisDeviceTracker
20from SquidstatPyLibrary import AisErrorCode
21from SquidstatPyLibrary import AisExperiment
22from SquidstatPyLibrary import AisConstantCurrentElement
23from SquidstatPyLibrary import AisOpenCircuitElement
24
25
26app = QApplication([])
27
28
29def convert_to_csv_line(data_list):
30 return ','.join(str(item) for item in data_list)
31
32
33
34class SerialPortReader(QObject):
35
36 dataReceived = Signal(str)
37
38
39 def __init__(self, port):
40 super().__init__()
41 self.port = port
42
43
44
45
46 def run(self):
47 if not self.port.isOpen():
48 self.port.open(QIODevice.ReadOnly)
49 while self.port.isOpen():
50 if self.port.waitForReadyRead():
51 data = self.port.readAll().data().decode()
52 self.dataReceived.emit(data)
53
54
55
56 def writeData(self, data):
57 if not self.port.isOpen():
58 successfullyOpen = self.port.open(QIODevice.WriteOnly)
59 if not successfullyOpen:
60 print("USB port is not open.")
61 self.port.write(data.encode())
62
63
64 def closePort(self):
65 if self.port.isOpen():
66 self.port.close()
67
68
69
70class WriteCSV:
71
72 def __init__(self, filename):
73 self.filename = filename
74 self.file = None
75
76
77 def write_header(self, header):
78 if self.file is None:
79 self.file = open(self.filename, 'w')
80 self.file.write(convert_to_csv_line(header) + '\n')
81
82
83 def write_data(self, data):
84 if self.file is not None:
85 self.file.write(convert_to_csv_line(data) + '\n')
86
87
88 def close(self):
89 if self.file is not None:
90 self.file.close()
91
92
93
94class writingThread(QThread):
95
96 writeData = Signal(float, float)
97 stopTowrite = Signal()
98
99
100 def __init__(self, csv_writer):
101 super().__init__()
102 self.timestamps = []
103 self.voltages = []
104 self.csv_writer = csv_writer
105
106
107 def run(self):
108 self.csv_writer.write_header(['Timestamp', 'Working Electrode Voltage'])
109 self.writeData.connect(self.add_data)
110 self.stopTowrite.connect(self.close)
111
112
113 def add_data(self, timestamp, voltage):
114 self.timestamps.append(timestamp)
115 self.voltages.append(voltage)
116 self.csv_writer.write_data([timestamp, voltage])
117
118
119 def close(self):
120 self.csv_writer.close()
121
122
123
124serialPort = QSerialPort("COM3")
125
126serialPort.setBaudRate(QSerialPort.Baud9600)
127
128serialPort.setDataBits(QSerialPort.Data8)
129
130serialPortReader = SerialPortReader(serialPort)
131
132
133
134def writeDataToPort(data):
135 serialPortReader.writeData(data)
136
137
139
140
141def onNewDeviceConnected(deviceName):
142
143 print(f"Connected to: {deviceName}")
144
145 handler = tracker.getInstrumentHandler(deviceName)
146
147 if handler:
148
149 csv_writer = WriteCSV('dataFile.csv')
150
151 writingThread = writingThread(csv_writer)
152
153 writingThread.start()
154
155
156
157
158 handler.activeDCDataReady.connect(lambda channel, data: (
159 print("timestamp:", "{:.9f}".format(data.timestamp), "workingElectrodeVoltage: ",
160 "{:.9f}".format(data.workingElectrodeVoltage)),
161 writingThread.writeData.emit(data.timestamp, data.workingElectrodeVoltage)
162 ))
163
164
165
166 handler.activeACDataReady.connect(lambda channel, data: print("frequency:", "{:.9f}".format(data.frequency),
167 "absoluteImpedance: ", "{:.9f}".format(
168 data.absoluteImpedance), "phaseAngle: ",
169 "{:.9f}".format(data.phaseAngle)))
170
171 handler.experimentNewElementStarting.connect(lambda channel, data: writeDataToPort(data.stepName))
172
173
174 handler.experimentStopped.connect(lambda channel: (print(f"Experiment completed on channel {channel}"), writingThread.stopTowrite.emit(), app.quit()))
175
176
178
179
181
183
184
185
186 successfullyadd = experiment.appendElement(ccElement, 1)
187
188
189 successfullyadd |= experiment.appendElement(opencircuitElement, 2)
190
191 if not successfullyadd:
192 print("Error adding element to experiment")
193 app.quit()
194
195
196 error = handler.uploadExperimentToChannel(0, experiment)
197 if error.value() != AisErrorCode.Success:
198 print(error.message())
199 app.quit()
200
201
202 error = handler.startUploadedExperiment(0)
203 if error.value() != AisErrorCode.Success:
204 print(error.message())
205 app.quit()
206
207
208tracker.newDeviceConnected.connect(onNewDeviceConnected)
209
210
211error = tracker.connectToDeviceOnComPort("COM4")
212if error:
213 print(error.message())
214 sys.exit()
215
216
217serialPortReader.dataReceived.connect(lambda data: print("Received data from COM port 3:", data))
218
219serialPortThread = QThread()
220
221serialPortReader.moveToThread(serialPortThread)
222
223serialPortThread.start()
224
225sys.exit(app.exec())
an experiment that simulates a constant current flow with more advance options for stopping the exper...
Definition AisConstantCurrentElement.h:18
static AisDeviceTracker * Instance()
get the instance of the device tracker.
this class is used to create custom experiments. A custom experiment contains one or more elements....
Definition AisExperiment.h:22
This experiment observes the open circuit potential of the working electrode for a specific period of...
Definition AisOpenCircuitElement.h:18