You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
265 lines
7.8 KiB
265 lines
7.8 KiB
#!/usr/bin/env python3
|
|
"""
|
|
Circular Buffer - Step-by-Step Tutorial
|
|
========================================
|
|
Follow the examples or type your own commands!
|
|
"""
|
|
|
|
from circular_buffer import cb_init, cb_write, cb_copy_span, cb_w_abs, cb_overflows, cb_fill_bytes, cb_fill_pct, Status
|
|
|
|
|
|
def show_buffer(buf, label="Buffer"):
|
|
"""Simple buffer visualization."""
|
|
w_abs = cb_w_abs(buf)
|
|
w_overflow = cb_overflows(buf)
|
|
fill_pct = cb_fill_pct(buf)
|
|
fill = cb_fill_bytes(buf)
|
|
pos = w_abs % buf.capacity
|
|
|
|
print(f"\n {label}:")
|
|
print(" ", end="")
|
|
for i in range(buf.capacity):
|
|
b = buf.buffer[i]
|
|
if b == 0:
|
|
print("[ ]", end="")
|
|
else:
|
|
c = chr(b) if 32 <= b < 127 else f"{b:02X}"
|
|
print(f"[{c}]", end="")
|
|
|
|
print(f" ← capacity={buf.capacity}")
|
|
print(" ", end="")
|
|
for i in range(buf.capacity):
|
|
print(" ↑ " if i == pos else " ", end="")
|
|
print(f" ← write position={pos}")
|
|
print(f" Fill: {fill}/{buf.capacity} bytes | Total written: {w_abs} | Percent Full: {fill_pct}\n")
|
|
print(f" Overflow Count : {w_overflow} \n")
|
|
|
|
|
|
def tutorial_mode():
|
|
"""Guided tutorial."""
|
|
print("\n" + "="*70)
|
|
print(" TUTORIAL MODE - Follow Along!")
|
|
print("="*70 + "\n")
|
|
|
|
# Create small buffer
|
|
capacity = 8
|
|
print(f"Step 1: Create buffer with capacity={capacity}")
|
|
status, buf = cb_init(capacity)
|
|
print(f" → Created!")
|
|
show_buffer(buf, "Empty Buffer")
|
|
|
|
input("Press ENTER to continue...")
|
|
|
|
# Write ABC
|
|
print("\nStep 2: Write 'ABC'")
|
|
data = b"ABC"
|
|
print(f" → cb_write(buffer, {data})")
|
|
st, written = cb_write(buf, data)
|
|
print(f" → Wrote {written} bytes")
|
|
show_buffer(buf, "After Writing 'ABC'")
|
|
|
|
input("Press ENTER to continue...")
|
|
|
|
# Write more
|
|
print("\nStep 3: Write 'DEFG'")
|
|
data = b"DEFG"
|
|
print(f" → cb_write(buffer, {data})")
|
|
st, written = cb_write(buf, data)
|
|
print(f" → Wrote {written} bytes")
|
|
show_buffer(buf, "After Writing 'DEFG'")
|
|
print(" Notice: Buffer is full! (7/8 bytes)")
|
|
|
|
input("Press ENTER to continue...")
|
|
|
|
# Read data
|
|
print("\nStep 4: Read all data")
|
|
w_abs = cb_w_abs(buf)
|
|
print(f" → cb_copy_span(buffer, 0, {w_abs})")
|
|
st, data = cb_copy_span(buf, 0, w_abs)
|
|
print(f" → Read: {data}")
|
|
print(f" → Decoded: '{data.decode()}'")
|
|
show_buffer(buf, "Buffer Still Has Data")
|
|
print(" Note: Reading doesn't remove data!")
|
|
|
|
input("Press ENTER to continue...")
|
|
|
|
# Overflow
|
|
print("\nStep 5: Write more data than capacity (OVERFLOW!)")
|
|
data = b"HIJKL"
|
|
print(f" → cb_write(buffer, {data}) - that's 5 more bytes!")
|
|
print(f" → Buffer is full, so oldest data will be overwritten...")
|
|
st, written = cb_write(buf, data)
|
|
print(f" → Wrote {written} bytes")
|
|
show_buffer(buf, "After Overflow")
|
|
print(" Notice: 'ABC' got overwritten! Oldest data lost!")
|
|
print(" Buffer now has: 'DEFGHIJK' (last 8 bytes)")
|
|
|
|
input("Press ENTER to continue...")
|
|
|
|
# Read after overflow
|
|
print("\nStep 6: Read data after overflow")
|
|
w_abs = cb_w_abs(buf)
|
|
start = w_abs - capacity # Only keep what fits
|
|
print(f" → cb_copy_span(buffer, {start}, {w_abs})")
|
|
st, data = cb_copy_span(buf, start, w_abs)
|
|
print(f" → Read: {data}")
|
|
print(f" → Decoded: '{data.decode()}'")
|
|
print("\n ✅ This is how circular buffer works!")
|
|
print(" • Fixed size (no growing)")
|
|
print(" • Drop oldest when full")
|
|
print(" • Keep most recent data")
|
|
|
|
|
|
def playground_mode():
|
|
"""Free-form playground."""
|
|
print("\n" + "="*70)
|
|
print(" PLAYGROUND MODE - Type Your Own Commands!")
|
|
print("="*70 + "\n")
|
|
|
|
print("Commands:")
|
|
print(" w <text> - Write text (e.g., 'w HELLO')")
|
|
print(" r - Read all data")
|
|
print(" r <n> - Read last n bytes")
|
|
print(" s - Show status")
|
|
print(" c - Clear (reset) buffer")
|
|
print(" q - Quit")
|
|
print()
|
|
|
|
# Create buffer
|
|
capacity = 12
|
|
status, buf = cb_init(capacity)
|
|
print(f"Created buffer: capacity={capacity}")
|
|
show_buffer(buf, "Initial State")
|
|
|
|
while True:
|
|
try:
|
|
cmd = input("cmd> ").strip()
|
|
|
|
if not cmd:
|
|
continue
|
|
|
|
parts = cmd.split(maxsplit=1)
|
|
action = parts[0].lower()
|
|
|
|
if action == 'q':
|
|
print("Goodbye!\n")
|
|
break
|
|
|
|
elif action == 'w':
|
|
if len(parts) < 2:
|
|
print("Usage: w <text>")
|
|
continue
|
|
text = parts[1]
|
|
data = text.encode('ascii')
|
|
print(f"\nWriting: {data} ('{text}')")
|
|
st, written = cb_write(buf, data)
|
|
print(f"Wrote: {written} bytes")
|
|
show_buffer(buf)
|
|
|
|
elif action == 'r':
|
|
w_abs = cb_w_abs(buf)
|
|
if w_abs == 0:
|
|
print("\nBuffer is empty!")
|
|
continue
|
|
|
|
if len(parts) > 1:
|
|
try:
|
|
n = int(parts[1])
|
|
start = max(0, w_abs - n)
|
|
except:
|
|
print("Invalid number")
|
|
continue
|
|
else:
|
|
start = max(0, w_abs - capacity)
|
|
|
|
st, data = cb_copy_span(buf, start, w_abs)
|
|
print(f"\nRead: {data}")
|
|
print(f"Text: '{data.decode('ascii', errors='replace')}'")
|
|
|
|
elif action == 's':
|
|
show_buffer(buf, "Current State")
|
|
|
|
elif action == 'c':
|
|
buf.buffer[:] = b'\x00' * capacity
|
|
buf.write_absolute = 0
|
|
buf.overflows = 0
|
|
print("\nBuffer cleared!")
|
|
show_buffer(buf)
|
|
|
|
else:
|
|
print(f"Unknown command: {action}")
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n\nGoodbye!\n")
|
|
break
|
|
except EOFError:
|
|
print("\n\nGoodbye!\n")
|
|
break
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|
|
|
|
|
|
def quick_test():
|
|
"""Quick automated test you can watch."""
|
|
print("\n" + "="*70)
|
|
print(" QUICK TEST - Watch What Happens!")
|
|
print("="*70 + "\n")
|
|
|
|
import time
|
|
|
|
status, buf = cb_init(10)
|
|
print("Created buffer: capacity=10")
|
|
show_buffer(buf)
|
|
time.sleep(1)
|
|
|
|
print("Writing: ABC")
|
|
cb_write(buf, b"ABC")
|
|
show_buffer(buf)
|
|
time.sleep(1)
|
|
|
|
print("Writing: DEFGH")
|
|
cb_write(buf, b"DEFGH")
|
|
show_buffer(buf)
|
|
time.sleep(1)
|
|
|
|
print("Writing: IJKLM (OVERFLOW!)")
|
|
cb_write(buf, b"IJKLM")
|
|
show_buffer(buf)
|
|
print("Notice: Oldest data (ABC) was overwritten!")
|
|
time.sleep(1)
|
|
|
|
w_abs = cb_w_abs(buf)
|
|
st, data = cb_copy_span(buf, w_abs - 10, w_abs)
|
|
print(f"\nFinal data: {data.decode()}")
|
|
print("That's the last 10 bytes written!\n")
|
|
|
|
|
|
def main():
|
|
"""Main menu."""
|
|
print("\n" + "="*70)
|
|
print(" CIRCULAR BUFFER - LEARNING PLAYGROUND")
|
|
print("="*70)
|
|
print("\nChoose a mode:")
|
|
print(" 1. Tutorial (step-by-step guided)")
|
|
print(" 2. Playground (type your own commands)")
|
|
print(" 3. Quick Test (watch automated demo)")
|
|
print(" q. Quit")
|
|
print()
|
|
|
|
choice = input("Select [1/2/3/q]: ").strip()
|
|
|
|
if choice == '1':
|
|
tutorial_mode()
|
|
elif choice == '2':
|
|
playground_mode()
|
|
elif choice == '3':
|
|
quick_test()
|
|
elif choice.lower() == 'q':
|
|
print("\nGoodbye!\n")
|
|
else:
|
|
print("\nInvalid choice. Goodbye!\n")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|