<<<

Build a circuitpython & micropython task scheduler for raspberry pi pico

What I want

With circuitpython or micropython on raspberry pi pico, there is only one cpu core will be used in common situation, eventhough, micropython do support second cpu core. So, I want a task scheduler on it, it will just like coroutine, every task will be like a forever loop, there will be yield in the task loop. Every time a task yield, the procedure will be return back to scheduler, the scheduler will schedule the next task to run. This kind scheduler will be better than a forever main loop, the cpu core will be used to run another task when the current running task suspended, and, it will be more easy to code with.

Design

Message: A message to send to another task

Condition: A signal to yield to suspend current task and back to scheduler with/without sending messages to another task

Task: A function with forever loop and yield sentence, it do all the workload

Scheduler: A class with forever loop procedure, do all the scheduling work, operate all the communications between tasks, and, statistic memory and cpu load

procedure

Project Structure

$ tree
.
├── code.py # example code.py
├── common.py # common methods
├── doc # document images
├── lib # lib directory, currently, no dependency needed
├── main.py # example main.py
└── scheduler.py # scheduler classes & methods

How to use it

1. clone the code

$ git clone git@github.com:fiefdx/pico_scheduler.git

# or fork then clone from yourself github

2. copy the code into raspberry pi pico

copy into pico

3. open Thonny connect to raspberry pi pico

open thonny

4. explain the code.py

import os
import gc

from scheduler import Scheluder, Condition, Task, Message # import basic components
from common import ticks_ms, ticks_add, ticks_diff, sleep_ms # import basic methods


# define monitor task function
# task & name parameters are required by task function signature
# other parameters can be customized
# monitor task send cpu & memory load message to display task every 2 seconds
def monitor(task, name, scheduler = None, display_id = None):
    while True:
        gc.collect()
        monitor_msg = "CPU:%3d%%  RAM:%3d%%" % (int(100 - scheduler.idle), int(100 - (scheduler.mem_free() * 100 / (264 * 1024))))
        yield Condition(sleep = 2000, send_msgs = [Message({"msg": monitor_msg}, receiver = display_id)]) # sleep 2000 ms, and, send message to display task


# define display task function
# display task wait for message to print
def display(task, name):
    while True:
        yield Condition(sleep = 0, wait_msg = True) # suspend and wait for message to recover
        msg = task.get_message()
        print(msg.content["msg"])


# define counter task function
# counter task send message to display task and sleep interval ms every 100 times
# otherwise, it will sleep interval ms
def counter(task, name, interval = 100, display_id = None):
    n = 0
    while True:
        if n % 100 == 0:
            yield Condition(sleep = interval, send_msgs = [Message({"msg": "counter: %06d" % n}, receiver = display_id)]) # sleep interval ms, and, send message to display task
        else:
            yield Condition(sleep = interval) # sleep interval ms
        n += 1


if __name__ == "__main__":
    try:
        s = Scheluder() # initiate a scheduler instance
        display_id = s.add_task(Task(display, "display")) # scheduler add display task
        monitor_id = s.add_task(Task(monitor, "monitor", kwargs = {"scheduler": s, "display_id": display_id})) # scheduler add monitor task
        counter_id = s.add_task(Task(counter, "counter", kwargs = {"interval": 10, "display_id": display_id})) # scheduler add counter task
        s.run() # run scheduler
    except Exception as e:
        print("main: %s" % str(e))

5. performance

performance

Run the 10 ms interval counter task, with monitor and display tasks, it will take 59% cpu workload and 30% ram workload, at default cpu frequency 133mhz

performance 200mhz

Run the 10 ms interval counter task, with monitor and display tasks, it will take 15% cpu workload and 30% ram workload, at default cpu frequency 200mhz

6. how to overclock raspberry pi pico with python

machine = None
microcontroller = None
try:
    import machine
except:
    try:
        import microcontroller
    except:
        print("no machine & microcontroller module support")
if machine:
    machine.freq(200000000) # for micropython use machine to set frequency
    print("freq: %s mhz" % (machine.freq() / 1000000))
if microcontroller:
    microcontroller.cpu.frequency = 200000000 # for circuitpython use microcontroller to set frequency
    print("freq: %s mhz" % (microcontroller.cpu.frequency / 1000000))