lang/python/ PythonThreading1
Examples
From here
from time import sleep, perf_counter
from threading import Thread
def task():
print('Starting a task...')
sleep(1)
print('done')
start_time = perf_counter()
# create two new threads
t1 = Thread(target=task)
t2 = Thread(target=task)
# start the threads
t1.start()
t2.start()
# wait for the threads to complete
t1.join()
t2.join()
Event listener example
This uses one thread per /dev/input/event*
file and prints out what events happen and which /dev
node they come from.
#!/usr/bin/python
import struct
import time
import sys
from glob import glob
from threading import Thread
def main():
threads = {}
nodes = glob("/dev/input/event*")+glob("/dev/input/mouse*")+glob("/dev/input/mice")
#threads = {node:start_thread(node) for node in inputs}
print(nodes)
for node in nodes:
print(node)
threads[node] = start_thread(node)
#for n,t in threads.items():
# t.join()
def start_thread(node):
print(node)
thread = Thread(target=listen,args=(node,))
thread.start()
return thread
def listen(node):
try:
listen1(node)
except KeyboardInterrupt:
print("Ctrl-C")
exit(0)
def listen1(node):
"""
FORMAT represents the format used by linux kernel input event struct
See https://github.com/torvalds/linux/blob/v5.5-rc5/include/uapi/linux/input.h#L28
Stands for: long int, long int, unsigned short, unsigned short, unsigned int
"""
FORMAT = 'llHHI'
EVENT_SIZE = struct.calcsize(FORMAT)
#open file in binary mode
in_file = open(node, "rb")
event = in_file.read(EVENT_SIZE)
while event:
(tv_sec, tv_usec, type, code, value) = struct.unpack(FORMAT, event)
if type != 0 or code != 0 or value != 0:
print("%s: Event type %u, code %u, value %u at %d.%d" % \
(node, type, code, value, tv_sec, tv_usec))
else:
# Events with code, type and value == 0 are "separator" events
print("===========================================")
event = in_file.read(EVENT_SIZE)
in_file.close()
if __name__ == "__main__":
main()