Loading...
Searching...
No Matches
tcpServer.py

This example file shows how to create a server that connects to an instrument.

This example file shows how to create a server that connects to an instrument.It can recieve TCP commands from a client to trigger experiments and send data back to the client.

1"""! @example tcpServer.py
2@brief This example file shows how to create a server that connects to an instrument.
3
4It can recieve TCP commands from a client to trigger experiments and send data back to the client.
5"""
6import os
7import socket
8import threading
9from PySide6.QtWidgets import QApplication
10from SquidstatPyLibrary import AisDeviceTracker
11from SquidstatPyLibrary import AisExperiment
12from SquidstatPyLibrary import AisOpenCircuitElement
13from SquidstatPyLibrary import AisErrorCode
14
15# Define the server address and port
16HOST = 'localhost'
17PORT = 12345
18
19# The COM port the Squidstat is connected to
20SQUIDCOMPORT = "COM1"
21SQUIDNAME = "Plus2000"
22
23# Create the QT application
24app = QApplication([])
25activeSockets = []
26
27# This will build and start the Open Circuit Potential experiment
28def start_ocp_experiment(handler, durationSec=60):
29 # Create an experiment with elements
30 experiment = AisExperiment()
31 ocpElement = AisOpenCircuitElement(durationSec, 1)
32
33 success = experiment.appendElement(ocpElement, 1)
34 if not success:
35 print("Error adding element to experiment")
36 return error.ExperimentNotUploaded
37
38 # Upload the experiment to channel 0
39 error = handler.uploadExperimentToChannel(0, experiment)
40 if error.value() != AisErrorCode.ErrorCode.Success:
41 return error
42
43 # Start the experiment
44 return(handler.startUploadedExperiment(0))
45
46# Send a specified command to our Squidstat
47def command_to_device(command, handler):
48 #Check if we had an argument associated with the command
49 splitCommand = command.split(" ")
50 action = splitCommand[0]
51 actionArg = 0
52 if(len(splitCommand) > 1):
53 try:
54 actionArg = int(splitCommand[1])
55 except:
56 actionArg = 0
57
58 # Here you can add various commands which can be send from the tcpClient to directly interact with the Squidstat
59 response = None
60 if action == 'startExperiment':
61 response = start_ocp_experiment(handler, actionArg)
62 elif action == 'stopExperiment':
63 response = handler.stopExperiment(0)
64 else:
65 #print("Invalid command:", command)
66 pass
67 return response
68
69# Handle commands from the client
70def handle_command(command, handler, client_socket):
71 # Send a response back to the client
72 responseMsg = "Unknown Command"
73 response = command_to_device(command, handler)
74 if(response != None):
75 responseMsg = response.message()
76 response = "{}".format(responseMsg)
77 client_socket.send(response.encode())
78
79# Listen for the client's messages, and disconnect signals and terminate program when finished
80def handle_client(handler, client_socket):
81 print("Client connected")
82
83 while True:
84 # Receive data from the client
85 try:
86 data = client_socket.recv(1024).decode()
87 except ConnectionResetError:
88 break
89
90 # Check if the client has closed the connection
91 if not data:
92 break
93
94 # Handle the command
95 handle_command(data, handler, client_socket)
96
97
98 handler.activeDCDataReady.disconnect()
99 handler.activeACDataReady.disconnect()
100 handler.experimentNewElementStarting.disconnect()
101 handler.experimentStopped.disconnect()
102 command_to_device("stopExperiment", handler)
103 # Close the client socket
104 client_socket.close()
105 print("Client disconnected")
106 os._exit(1)
107
108# Send data the the client based on the type of event (Hooked up to signals)
109def send_data_to_client(client_socket, event_type, data):
110 if event_type == "DCData":
111 message = "timestamp: {:.9f}, workingElectrodeVoltage: {:.9f}".format(data.timestamp, data.workingElectrodeVoltage)
112 elif event_type == "ACData":
113 message = "frequency: {:.9f}, absoluteImpedance: {:.9f}, phaseAngle: {:.9f}".format(data.frequency, data.absoluteImpedance, data.phaseAngle)
114 elif event_type == "NewElement":
115 message = "New Node beginning: {}, step number: {}, step sub: {}".format(data.stepName, data.stepNumber, data.substepNumber)
116 elif event_type == "ExperimentCompleted":
117 message = "Experiment Completed: {}".format(data)
118 else:
119 return
120
121 client_socket.send(message.encode())
122
123def terminate_program():
124 print("Press <CTRL>+c to close the server")
125 try:
126 while True:
127 input()
128 except (EOFError, KeyboardInterrupt):
129 pass
130 for socket in activeSockets:
131 socket.close()
132 app.quit()
133 os._exit(1)
134
135
136# Create the device tracker and connect to the Squidstat we will be using
137print(f"Attempting to connect to the Squidstat {SQUIDNAME} on {SQUIDCOMPORT}...")
139tracker.newDeviceConnected.connect(lambda deviceName: print("Device is Connected: %s" % deviceName))
140error = tracker.connectToDeviceOnComPort(SQUIDCOMPORT)
141
142if error.value() != AisErrorCode.ErrorCode.Success:
143 print(error.message())
144 exit()
145
146# Create the instrument handler
147handler = tracker.getInstrumentHandler(SQUIDNAME)
148print("Connection successful\n")
149
150# Create the TCP/IP socket and bind it to our host
151print("Starting server...")
152server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
153activeSockets.append(server_socket)
154server_socket.bind((HOST, PORT))
155
156# Listen for incoming connections
157server_socket.listen(1)
158
159print("Server started successfully. Waiting for client connection...")
160
161terminal_thread = threading.Thread(target=terminate_program)
162terminal_thread.start()
163
164# Accept a client connection
165client_socket, client_address = server_socket.accept()
166activeSockets.append(client_socket)
167
168# Connect the signals to send data to the client
169handler.activeDCDataReady.connect(lambda channel, data: send_data_to_client(client_socket, "DCData", data))
170handler.activeACDataReady.connect(lambda channel, data: send_data_to_client(client_socket, "ACData", data))
171handler.experimentNewElementStarting.connect(lambda channel, data: send_data_to_client(client_socket, "NewElement", data))
172handler.experimentStopped.connect(lambda channel: send_data_to_client(client_socket, "ExperimentCompleted", channel))
173
174# Start the listening process in a separate thread
175listening_thread = threading.Thread(target=handle_client, args=(handler, client_socket))
176listening_thread.start()
177
178# Start the QT event loop
179app.exec()
static AisDeviceTracker * Instance()
get the instance of the device tracker.
this class is used to create custom experiments. A custom experiment contains one or more elements....
Definition AisExperiment.h:22
This experiment observes the open circuit potential of the working electrode for a specific period of...
Definition AisOpenCircuitElement.h:18