Event Listening

Overview

every(duration, token)

Returns a Subscription that produces messages at a set interval.

stream

TODO

Subscription

TODO

Details

pyiced.every(duration, token)

Returns a Subscription that produces messages at a set interval.

The first Message is produced after a duration, and then continues to produce more messages every duration after that.

Parameters
Returns

The new subscription.

Every “duration” a message (token, instant) is sent to pyiced.IcedApp.update().

See also

Instant.

Return type

Subscription

from asyncio import open_connection
from contextlib import closing

from pyiced import (
    Align, Color, container, ContainerStyle, Font, IcedApp, Length, text,
)


class AsyncMessageExample(IcedApp):
    def __init__(self):
        self.__font = None

    class settings:
        class window:
            size = (640, 320)

    def title(self):
        return 'Asynchronous Messages'

    def new(self):
        return load_font()

    def update(self, msg, clipboard):
        match msg:
            case ('Font', font):
                self.__font = font

    def view(self):
        return container(
            text('Hello, world!', size=80, font=self.__font),
            style=ContainerStyle(
                text_color=Color(0.95, 0.87, 0.22),
                background=Color(0.38, 0.60, 0.23),
            ),
            padding=20, align_x=Align.CENTER, align_y=Align.CENTER,
            width=Length.FILL, height=Length.FILL,
        )


async def load_font():
    FONT_NAME = 'Yellowtail'
    FONT_HOST = 'fonts.cdnfonts.com'
    FONT_PATH = '/s/16054/Yellowtail-Regular.ttf'

    query = (
        f"GET {FONT_PATH} HTTP/1.0\r\n"
        f"Host: {FONT_HOST}\r\n"
        f"Connection: closed\r\n"
        f"User-Agent: Mozilla/1.22 (compatible; MSIE 2.0; Windows 95)\r\n"
        f"\r\n"
    ).encode('US-ASCII')

    reader, writer = await open_connection(FONT_HOST, 443, ssl=True)
    with closing(writer):
        writer.write(query)
        await writer.drain()
        while (await reader.readline()) != b'\r\n':
            continue

        data = await reader.read()
    await writer.wait_closed()

    return ('Font', Font(FONT_NAME, data))


if __name__ == '__main__':
    AsyncMessageExample().run()
pyiced.stream()

TODO

from asyncio import sleep

from pyiced import column, IcedApp, stream, text


class StreamExample(IcedApp):
    def __init__(self):
        self.__stream = stream(self.__generator_func())
        self.__index = 0

    class settings:
        class window:
            size = (640, 40)

    def title(self):
        return 'Stream Example'

    def view(self):
        return column([text(f'Index: {self.__index / 10:.1f}')])

    def subscriptions(self):
        if self.__stream is not None:
            return [self.__stream]

    def update(self, msg, clipboard):
        match msg:
            case 'done':
                self.__stream = None
            case int(index):
                self.__index = index

    async def __generator_func(self):
        for i in range(1, 101):
            yield i
            await sleep(0.1)
        yield 'done'


if __name__ == '__main__':
    StreamExample().run()
class pyiced.Subscription

TODO

from pyiced import (
    Align, container, Message, IcedApp, Length, Subscription, text,
)


class FullscreenExample(IcedApp):
    def __init__(self):
        self.__fullscreen = False
        self.__should_exit = False

    class settings:
        class window:
            size = (640, 320)

    def subscriptions(self):
        return [Subscription.UNCAPTURED]

    def fullscreen(self):
        return self.__fullscreen

    def should_exit(self):
        return self.__should_exit

    def title(self):
        return self.__message

    def update(self, msg, clipboard):
        match msg:
            case Message(keyboard='keyreleased', key_code='F11'):
                self.__fullscreen = not self.__fullscreen
            case Message(keyboard='keyreleased', key_code='Escape'):
                self.__should_exit = True

    def view(self):
        return container(
            text(self.__message, size=40),
            padding=20, align_x=Align.CENTER, align_y=Align.CENTER,
            width=Length.FILL, height=Length.FILL,
        )

    @property
    def __message(self):
        if self.__fullscreen:
            return 'Fullscreen (press F11!)'
        else:
            return 'Windowed (press F11!)'


if __name__ == '__main__':
    FullscreenExample().run()
NONE = <pyiced.Subscription object>
UNCAPTURED = <pyiced.Subscription object>