mirror of
https://github.com/Astatin3/photonvision-2025.0.0-beta-6.git
synced 2026-06-08 16:18:03 -06:00
92 lines
2.4 KiB
Python
92 lines
2.4 KiB
Python
import argparse
|
|
from time import sleep
|
|
|
|
import ntcore
|
|
from tabulate import tabulate
|
|
|
|
|
|
def list_topics(inst: ntcore.NetworkTableInstance, root: str):
|
|
topics = inst.getTable(root).getTopics()
|
|
subtables = inst.getTable(root).getSubTables()
|
|
|
|
print(f"Topics under {root}")
|
|
print(
|
|
tabulate(
|
|
[
|
|
[topic.getName(), topic.getType().name, topic.getTypeString()]
|
|
for topic in topics
|
|
],
|
|
headers=["Topic Name", "Type", "Type String"],
|
|
)
|
|
)
|
|
print("")
|
|
print(f"Tables under {root}")
|
|
print(tabulate([[table] for table in subtables], headers=["Table Name"]))
|
|
print("")
|
|
|
|
|
|
def print_topic(inst: ntcore.NetworkTableInstance, topic: str):
|
|
sub = inst.getTopic(topic).genericSubscribe(
|
|
options=ntcore.PubSubOptions(sendAll=True, pollStorage=20)
|
|
)
|
|
print("")
|
|
print(f"Subscribed to {topic}, typestring '{sub.getTopic().getTypeString()}'")
|
|
print(f"Properties:")
|
|
print(sub.getTopic().getProperties())
|
|
print("")
|
|
|
|
start_time = ntcore._now()
|
|
count = 0
|
|
while True:
|
|
now = ntcore._now()
|
|
new_count = len(sub.readQueue())
|
|
count = count + new_count
|
|
|
|
hz = count / float(max(now - start_time, 0.1) * 1e-6)
|
|
|
|
print(f"{topic} = {sub.get().value()} (rate={hz:.1f}hz, samples={count})")
|
|
sleep(1)
|
|
|
|
|
|
def connect(inst: ntcore.NetworkTableInstance, server: str):
|
|
inst.stopServer()
|
|
inst.setServer(server)
|
|
inst.startClient4("catnt")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Cat a topic")
|
|
parser.add_argument(
|
|
"--echo", type=str, help="Fully qualified topic name", required=False
|
|
)
|
|
parser.add_argument(
|
|
"--server",
|
|
type=str,
|
|
default="127.0.0.1",
|
|
help="IP address of the NT4 server",
|
|
required=False,
|
|
)
|
|
parser.add_argument("--list", help="List all topics", required=False)
|
|
|
|
args = parser.parse_args()
|
|
inst = ntcore.NetworkTableInstance.getDefault()
|
|
|
|
connect(inst, args.server)
|
|
# retained to keep the subscriber alive
|
|
topicNameSubscriber = ntcore.MultiSubscriber(
|
|
inst, ["/"], ntcore.PubSubOptions(topicsOnly=True)
|
|
)
|
|
sleep(1)
|
|
|
|
while not inst.isConnected():
|
|
sleep(0.1)
|
|
|
|
if args.list:
|
|
list_topics(inst, args.list)
|
|
if args.echo:
|
|
print_topic(inst, args.echo)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|