Files
bamboo-to-telagram/api-to-mqtt.py
2025-11-17 04:02:19 +00:00

113 lines
3.9 KiB
Python

# This is the bambo api to mqtt program and is a combo of the exsampal and ai to make the mqtt bit
#
# change IP, SERIAL, ACCESS_CODE, MQTT_BROKER, MQTT_USER, MQTT_PASSWORD
# chanch if you wish MQTT_PORT, MQTT_TOPIC, DEBUG
import time
import os
import datetime
import json
import paho.mqtt.client as mqtt
import bambulabs_api as bl # Assumes this is installed: pip install bambulabs_api
# --- Bambu Lab Printer Configuration ---
# NOTE: Replace these with your actual printer details
IP = 'change me'
SERIAL = 'change me'
ACCESS_CODE = 'change me'
# --- MQTT Configuration ---
MQTT_BROKER = 'change me'
MQTT_PORT = 1883 # Standard MQTT port
MQTT_USER = 'change me'
MQTT_PASSWORD = 'change me'
MQTT_TOPIC = 'bambu/printer/status'
# Global flag to control the print/publish behavior
DEBUG = os.getenv("DEBUG", "True").lower() == "false"
def on_connect(client, userdata, flags, rc):
"""Callback for when the client receives a CONNACK response from the server."""
if rc == 0:
print("MQTT Client connected successfully.")
else:
print(f"MQTT Client connection failed with code {rc}")
def main():
print('Starting Bambu Lab MQTT Status Publisher')
# 1. Initialize MQTT Client
mqtt_client = mqtt.Client(client_id="bambu_publisher")
mqtt_client.on_connect = on_connect
mqtt_client.username_pw_set(MQTT_USER, MQTT_PASSWORD)
try:
print(f'Connecting to MQTT broker at {MQTT_BROKER}:{MQTT_PORT}...')
mqtt_client.connect(MQTT_BROKER, MQTT_PORT, 60)
mqtt_client.loop_start()
# 2. Initialize Bambu Printer API
print(f'Connecting to BambuLab printer (IP: {IP})')
printer = bl.Printer(IP, ACCESS_CODE, SERIAL)
printer.connect()
while True:
time.sleep(30)
# 3. Get the printer status data
status = printer.get_state()
percentage = printer.get_percentage()
layer_num = printer.current_layer_num()
total_layer_num = printer.total_layer_num()
bed_temperature = printer.get_bed_temperature()
nozzle_temperature = printer.get_nozzle_temperature()
remaining_time = printer.get_time()
finish_time_format = "NA"
if remaining_time is not None and remaining_time > 0:
finish_time = datetime.datetime.now() + datetime.timedelta(
minutes=int(remaining_time))
finish_time_format = finish_time.strftime("%Y-%m-%d %H:%M:%S")
# 4. Package data into a JSON payload
payload = {
"timestamp": datetime.datetime.now().isoformat(),
"status": status,
"percentage": percentage,
"layer_num": layer_num,
"total_layer_num": total_layer_num,
"bed_temperature_c": bed_temperature,
"nozzle_temperature_c": nozzle_temperature,
"remaining_time_min": remaining_time,
"finish_time": finish_time_format
}
json_payload = json.dumps(payload)
# 5. Publish to MQTT
mqtt_client.publish(MQTT_TOPIC, json_payload, qos=1, retain=False)
if DEBUG:
# Log the data that was sent
print(f"[{datetime.datetime.now().strftime('%H:%M:%S')}] Published to {MQTT_TOPIC}:")
print(json.dumps(payload, indent=2))
print("-" * 50)
except KeyboardInterrupt:
print("\nStopping publisher...")
except Exception as e:
print(f"\nAn error occurred: {e}")
finally:
# 6. Disconnect gracefully
if 'printer' in locals():
printer.disconnect()
print("Disconnected from Bambu Lab printer.")
if 'mqtt_client' in locals():
mqtt_client.loop_stop()
mqtt_client.disconnect()
print("Disconnected from MQTT broker.")
if __name__ == '__main__':
main()