63 def on(input_callback, event_name, pipeline, callback, taplength=150):
70 _listenables.append( (input_callback,
96 def get_current_millis():
97 return int(time.time() * 1000)
106 for index, listenable
in enumerate(_listenables):
109 input_callback,event_name,pipeline,callback,taplength = listenable
112 new = input_callback()
115 if event_name == Events.ALWAYS:
116 callback(pipeline.run(new))
122 elif event_name == Events.CHANGED:
123 if _results[index] != new:
124 _results[index] = new
125 callback(pipeline.run(new))
140 if float(new) != 0.0:
143 if float(_results[index]) == 0.0:
149 if taplength == 0
and event_name == Events.PRESSED:
150 callback(pipeline.run(new))
152 _results[index] = 2.0
158 _times[index] = get_current_millis()
162 _results[index] = 1.0
166 elif float(_results[index]) == 1.0:
169 if (get_current_millis() - _times[index]) >= taplength:
174 if event_name == Events.PRESSED:
175 callback(pipeline.run(new))
176 _results[index] = 2.0
186 if float(_results[index]) == 2.0:
187 if event_name == Events.RELEASED:
188 callback(pipeline.run(new))
191 _results[index] = 0.0
195 elif float(_results[index]) == 1.0:
196 if event_name == Events.TAPPED:
197 callback(pipeline.run(new))
200 _results[index] = 0.0
270 if isinstance(step, (tuple, list)):
275 elif isinstance(step, Pipeline):
281 elif isinstance(step, (types.FunctionType,
283 types.BuiltinFunctionType)):
284 self.
steps.append(step)
299 for step
in self.
steps:
def tick()
Poll all the inputs.
def __init__(self, steps)
Create a new pipeline.
Poll inputs and listen for changes.
A simplistic but flexible pipeline implementation.
def add(self, steps)
Add steps to the pipline.
def on(input_callback, event_name, pipeline, callback, taplength=150)
Add an input to the poller.
def run(self, value)
Run this pipeline.
Define high-level events for polling.