# This is the bambo api to mqtt program and is a combo of the exsampal and ai to make the mqtt bit # # change IP, SERIAL, ACCESS_CODE, MQTT_BROKER, MQTT_USER, MQTT_PASSWORD # chanch if you wish MQTT_PORT, MQTT_TOPIC, DEBUG import time import os import datetime import json import paho.mqtt.client as mqtt import bambulabs_api as bl # Assumes this is installed: pip install bambulabs_api # --- Bambu Lab Printer Configuration --- # NOTE: Replace these with your actual printer details IP = 'change me' SERIAL = 'change me' ACCESS_CODE = 'change me' # --- MQTT Configuration --- MQTT_BROKER = 'change me' MQTT_PORT = 1883 # Standard MQTT port MQTT_USER = 'change me' MQTT_PASSWORD = 'change me' MQTT_TOPIC = 'bambu/printer/status' # Global flag to control the print/publish behavior DEBUG = os.getenv("DEBUG", "True").lower() == "false" def on_connect(client, userdata, flags, rc): """Callback for when the client receives a CONNACK response from the server.""" if rc == 0: print("MQTT Client connected successfully.") else: print(f"MQTT Client connection failed with code {rc}") def main(): print('Starting Bambu Lab MQTT Status Publisher') # 1. Initialize MQTT Client mqtt_client = mqtt.Client(client_id="bambu_publisher") mqtt_client.on_connect = on_connect mqtt_client.username_pw_set(MQTT_USER, MQTT_PASSWORD) try: print(f'Connecting to MQTT broker at {MQTT_BROKER}:{MQTT_PORT}...') mqtt_client.connect(MQTT_BROKER, MQTT_PORT, 60) mqtt_client.loop_start() # 2. Initialize Bambu Printer API print(f'Connecting to BambuLab printer (IP: {IP})') printer = bl.Printer(IP, ACCESS_CODE, SERIAL) printer.connect() while True: time.sleep(30) # 3. Get the printer status data status = printer.get_state() percentage = printer.get_percentage() layer_num = printer.current_layer_num() total_layer_num = printer.total_layer_num() bed_temperature = printer.get_bed_temperature() nozzle_temperature = printer.get_nozzle_temperature() remaining_time = printer.get_time() finish_time_format = "NA" if remaining_time is not None and remaining_time > 0: finish_time = datetime.datetime.now() + datetime.timedelta( minutes=int(remaining_time)) finish_time_format = finish_time.strftime("%Y-%m-%d %H:%M:%S") # 4. Package data into a JSON payload payload = { "timestamp": datetime.datetime.now().isoformat(), "status": status, "percentage": percentage, "layer_num": layer_num, "total_layer_num": total_layer_num, "bed_temperature_c": bed_temperature, "nozzle_temperature_c": nozzle_temperature, "remaining_time_min": remaining_time, "finish_time": finish_time_format } json_payload = json.dumps(payload) # 5. Publish to MQTT mqtt_client.publish(MQTT_TOPIC, json_payload, qos=1, retain=False) if DEBUG: # Log the data that was sent print(f"[{datetime.datetime.now().strftime('%H:%M:%S')}] Published to {MQTT_TOPIC}:") print(json.dumps(payload, indent=2)) print("-" * 50) except KeyboardInterrupt: print("\nStopping publisher...") except Exception as e: print(f"\nAn error occurred: {e}") finally: # 6. Disconnect gracefully if 'printer' in locals(): printer.disconnect() print("Disconnected from Bambu Lab printer.") if 'mqtt_client' in locals(): mqtt_client.loop_stop() mqtt_client.disconnect() print("Disconnected from MQTT broker.") if __name__ == '__main__': main()