Event Listening¶
Overview¶
|
Returns a |
|
Listen for messages until the |
TODO |
Details¶
- pyiced.every(duration, token)¶
Returns a
Subscription
that produces messages at a set interval.The first
Message
is produced after a duration, and then continues to produce more messages every duration after that.- Parameters
duration (Union[float, datetime.timedelta]) – The interval in seconds or as a duration. Must be at least 100 µs!
token (object) – The first item of the message tuple to send to the
pyiced.IcedApp.update()
.
- Returns
The new subscription.
Every “duration” a message
(token, instant)
is sent topyiced.IcedApp.update()
.See also
- Return type
Example
from datetime import datetime, timedelta from pyiced import column, every, IcedApp, Instant, text class SubscriptionExample(IcedApp): def __init__(self): self.__counter = 0 self.__instant = Instant() self.__last_instant = self.__instant self.__ts = datetime.now().time() self.__subscription = every(timedelta(milliseconds=16.667), 'tick') class settings: class window: size = (320, 64) def title(self): return 'Subscription Example' def view(self): duration = self.__instant - self.__last_instant return column([ text(f'Counter: {self.__counter:09d}'), text(f'Duration: {duration * 10**3:9.6f} ms'), text(f'Time: {self.__ts}') ]) def subscriptions(self): return [self.__subscription] def update(self, msg, clipboard): match msg: case ('tick', instant): self.__last_instant = self.__instant self.__counter += 1 self.__instant = instant self.__ts = datetime.now().time() if __name__ == '__main__': SubscriptionExample().run()
See also
- pyiced.stream(async_generator)¶
Listen for messages until the
asynchronous generator
is exhausted.- Parameters
async_generator (AsyncGenerator[Optional[object], None]) – An asynchronous generator of messages.
- Returns
The wrapped generator.
- Return type
Example
from asyncio import sleep from pyiced import column, IcedApp, stream, text class StreamExample(IcedApp): def __init__(self): self.__stream = stream(self.__generator_func()) self.__index = 0 class settings: class window: size = (640, 40) def title(self): return 'Stream Example' def view(self): return column([text(f'Index: {self.__index / 10:.1f}')]) def subscriptions(self): if self.__stream is not None: return [self.__stream] def update(self, msg, clipboard): match msg: case 'done': self.__stream = None case int(index): self.__index = index async def __generator_func(self): for i in range(1, 101): yield i await sleep(0.1) yield 'done' if __name__ == '__main__': StreamExample().run()