iota  1.0.0
Flexible, pipeline-based input handling.
iota.py
1 # Copyright (c) 2017 Brandon Gong
2 #
3 # Permission is hereby granted, free of charge, to any person obtaining a copy
4 # of this software and associated documentation files (the "Software"), to deal
5 # in the Software without restriction, including without limitation the rights
6 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7 # copies of the Software, and to permit persons to whom the Software is
8 # furnished to do so, subject to the following conditions:
9 #
10 # The above copyright notice and this permission notice shall be included in all
11 # copies or substantial portions of the Software.
12 #
13 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
19 # SOFTWARE.
20 
21 
25 
26 
27 import types
28 import time
29 from enum import Enum
30 
31 
32 
38 class Poller:
39 
40  # lists for storing various things.
41  _listenables = []
42  _results = []
43  _times = []
44 
45 
63  def on(input_callback, event_name, pipeline, callback, taplength=150):
64 
65  # Add all those arguments to the _listenables list as a tuple.
66  #
67  # Notice that we reinstantiate the Pipeline; this is intentional.
68  # Refer to the Pipeline documentation for more information on how
69  # it increases flexibility.
70  _listenables.append( (input_callback,
71  event_name,
72  Pipeline(pipeline),
73  callback,
74  taplength) )
75 
76  # and then grow the helper lists to the same size.
77  _results.append(None)
78  _times.append(0)
79 
80 
81 
92  def tick():
93 
94  # a little helper function so we don't have to write out the whole
95  # time computation every time.
96  def get_current_millis():
97  return int(time.time() * 1000)
98 
99  # FIXME: this is the ugliest part of the code. there is far too much
100  # nesting to be considered good python style.
101  # Furthermore, the logic is quite difficult to understand and also prone
102  # to hidden bugs.
103 
104  # iterate through listeners. We use `enumerate` here because we use the
105  # index to access corresponding values in the _results and _times lists.
106  for index, listenable in enumerate(_listenables):
107 
108  # unpack the listenable tuple for the sake of readability.
109  input_callback,event_name,pipeline,callback,taplength = listenable
110 
111  # get the new value.
112  new = input_callback()
113 
114  # if the event is ALWAYS, run the pipeline with the new value.
115  if event_name == Events.ALWAYS:
116  callback(pipeline.run(new))
117  continue
118 
119  # the idea for CHANGED is very similar except we compare with the
120  # _results value, which stores the value of the previous tick,
121  # and only run the pipeline if it has changed.
122  elif event_name == Events.CHANGED:
123  if _results[index] != new:
124  _results[index] = new
125  callback(pipeline.run(new))
126  continue
127 
128  # the below code handles all of the other Event listeners.
129  # Here, _results takes on a new meaning:
130  # 0.0: input has been released; starting state.
131  # 1.0: input has been pressed but the PRESSED callback has not
132  # been fired yet.
133  # 2.0: input has been pressed and PRESSED callback has been
134  # fired.
135  # float() will convert boolean inputs to 0.0 or 1.0, keep range
136  # inputs the same, and then throw an error if neither of those
137  # work.
138 
139  # if the input is currently pressed on this tick
140  if float(new) != 0.0:
141 
142  # was the input released on the last tick?
143  if float(_results[index]) == 0.0:
144 
145  # if taplength is 0 and we are listening for the PRESSED
146  # event, call it right away.
147  # Making a special case for taplength of 0 is a minor
148  # performance improvement and saves one tick.
149  if taplength == 0 and event_name == Events.PRESSED:
150  callback(pipeline.run(new))
151  # set _results to 2.0; we've already fired the event.
152  _results[index] = 2.0
153 
154  else:
155  # record the current millis in the _times list.
156  # this is the exact moment the input has been pressed
157  # down.
158  _times[index] = get_current_millis()
159 
160  # and then change the _results to 1.0 to signal that we
161  # have recorded the press event
162  _results[index] = 1.0
163 
164  # If we've already recorded a press action, but we haven't fired
165  # the event yet:
166  elif float(_results[index]) == 1.0:
167 
168  # we just need to check if taplength has passed.
169  if (get_current_millis() - _times[index]) >= taplength:
170 
171  # if that period has passed, and we are listening for a
172  # PRESSED event, then fire the event and set _results
173  # to 2.0.
174  if event_name == Events.PRESSED:
175  callback(pipeline.run(new))
176  _results[index] = 2.0
177 
178  # if _results is 2.0, then we've already recorded the action
179  # and fired the event, so theres nothing to do here.
180 
181  # else if the input is 0.0:
182  else:
183 
184  # if the PRESSED event has already been fired, we now need to
185  # fire the corresponding RELEASED event.
186  if float(_results[index]) == 2.0:
187  if event_name == Events.RELEASED:
188  callback(pipeline.run(new))
189 
190  # and then reset _results back to 0.0
191  _results[index] = 0.0
192 
193  # if the PRESSED event has not yet been fired, but a press
194  # action has been recorded, we just need to fire a TAPPED event.
195  elif float(_results[index]) == 1.0:
196  if event_name == Events.TAPPED:
197  callback(pipeline.run(new))
198 
199  # and then reset _results back to 0.0
200  _results[index] = 0.0
201 
202  # if _results is 0.0, no press action has been recorded, so
203  # we just assume that that certain input hasn't been pressed.
204  # There's no release events that we need to handle.
205 
206 
207 
213 class Events(Enum):
214 
215 
217  ALWAYS = 0
218 
219 
221  CHANGED = 1
222 
223 
227  PRESSED = 2
228 
229 
231  RELEASED = 3
232 
233 
235  TAPPED = 4
236 
237 
238 
239 class Pipeline:
240 
241 
246  def __init__(self, *steps):
247  self.steps = []
248  self.add(steps)
249 
250 
262  def add(self, *steps):
263 
264  # iterate through steps.
265  for step in steps:
266 
267  # unpythonic but necessary instance checks. If it's a tuple or a
268  # list, break it up and pass it in to the `add` function again as
269  # varargs.
270  if isinstance(step, (tuple, list)):
271  self.add(*step)
272 
273  # else if the step is a pipeline, call the add method with that
274  # pipeline's steps.
275  elif isinstance(step, Pipeline):
276  self.add(step.steps)
277 
278  # else if it's any of those function types, just add it. The
279  # callable function is the lowest building block of a pipeline,
280  # and we can't recurse any lower.
281  elif isinstance(step, (types.FunctionType,
282  types.LambdaType,
283  types.BuiltinFunctionType)):
284  self.steps.append(step)
285 
286  # return this instance for optional chaining; however this shouldn't
287  # really be necessary given the varargs flexibility of the `add`
288  # function.
289  return self
290 
291 
297  def run(self, value):
298  x = value
299  for step in self.steps:
300  x = step(x)
301  return x
def tick()
Poll all the inputs.
Definition: iota.py:92
def __init__(self, steps)
Create a new pipeline.
Definition: iota.py:246
Poll inputs and listen for changes.
Definition: iota.py:38
A simplistic but flexible pipeline implementation.
Definition: iota.py:239
def add(self, steps)
Add steps to the pipline.
Definition: iota.py:262
def on(input_callback, event_name, pipeline, callback, taplength=150)
Add an input to the poller.
Definition: iota.py:63
def run(self, value)
Run this pipeline.
Definition: iota.py:297
Define high-level events for polling.
Definition: iota.py:213