import json
import time
import sys
import numpy as np
import robotpy_apriltag
import cv2
from cscore import CameraServer, VideoSource, UsbCamera, MjpegServer
from ntcore import NetworkTableInstance, EventFlags
JSON format:
{
“team”: ,
“ntmode”: <“client” or “server”, “client” if unspecified>
“cameras”: [
{
“name”:
“path”: <path, e.g. “/dev/video0”>
“pixel format”: <“MJPEG”, “YUYV”, etc> // optional
“width”:
“height”:
“fps”:
“brightness”: // optional
“white balance”: <“auto”, “hold”, value> // optional
“exposure”: <“auto”, “hold”, value> // optional
“properties”: [ // optional
{
“name”:
“value”:
}
],
“stream”: { // optional
“properties”: [
{
“name”:
“value”:
}
]
}
}
]
“switched cameras”: [
{
“name”:
“key”:
// if NT value is a string, it’s treated as a name
// if NT value is a double, it’s treated as an integer index
}
]
}
configFile = “/boot/frc.json”
class CameraConfig: pass
team = None
server = False
cameraConfigs =
switchedCameraConfigs =
cameras =
def parseError(str):
“”“Report parse error.”“”
print(“config error in '” + configFile + "': " + str, file=sys.stderr)
def readCameraConfig(config):
“”“Read single camera configuration.”“”
cam = CameraConfig()
# name
try:
cam.name = config["name"]
except KeyError:
parseError("could not read camera name")
return False
# path
try:
cam.path = config["path"]
except KeyError:
parseError("camera '{}': could not read path".format(cam.name))
return False
# stream properties
cam.streamConfig = config.get("stream")
cam.config = config
cameraConfigs.append(cam)
return True
def readSwitchedCameraConfig(config):
“”“Read single switched camera configuration.”“”
cam = CameraConfig()
# name
try:
cam.name = config["name"]
except KeyError:
parseError("could not read switched camera name")
return False
# path
try:
cam.key = config["key"]
except KeyError:
parseError("switched camera '{}': could not read key".format(cam.name))
return False
switchedCameraConfigs.append(cam)
return True
def readConfig():
“”“Read configuration file.”“”
global team
global server
# parse file
try:
with open(configFile, "rt", encoding="utf-8") as f:
j = json.load(f)
except OSError as err:
print("could not open '{}': {}".format(configFile, err), file=sys.stderr)
return False
# top level must be an object
if not isinstance(j, dict):
parseError("must be JSON object")
return False
# team number
try:
team = j["team"]
except KeyError:
parseError("could not read team number")
return False
# ntmode (optional)
if "ntmode" in j:
str = j["ntmode"]
if str.lower() == "client":
server = False
elif str.lower() == "server":
server = True
else:
parseError("could not understand ntmode value '{}'".format(str))
# cameras
try:
cameras = j["cameras"]
except KeyError:
parseError("could not read cameras")
return False
for camera in cameras:
if not readCameraConfig(camera):
return False
# switched cameras
if "switched cameras" in j:
for camera in j["switched cameras"]:
if not readSwitchedCameraConfig(camera):
return False
return True
def startCamera(config):
“”“Start running the camera.”“”
print(“Starting camera ‘{}’ on {}”.format(config.name, config.path))
camera = UsbCamera(config.name, config.path)
server = CameraServer.startAutomaticCapture(camera=camera)
camera.setConfigJson(json.dumps(config.config))
camera.setConnectionStrategy(VideoSource.ConnectionStrategy.kConnectionKeepOpen)
if config.streamConfig is not None:
server.setConfigJson(json.dumps(config.streamConfig))
return camera
def startSwitchedCamera(config):
“”“Start running the switched camera.”“”
print(“Starting switched camera ‘{}’ on {}”.format(config.name, config.key))
server = CameraServer.addSwitchedCamera(config.name)
def listener(event):
data = event.data
if data is not None:
value = data.value.value()
if isinstance(value, int):
if value >= 0 and value < len(cameras):
server.setSource(cameras[value])
elif isinstance(value, float):
i = int(value)
if i >= 0 and i < len(cameras):
server.setSource(cameras[i])
elif isinstance(value, str):
for i in range(len(cameraConfigs)):
if value == cameraConfigs[i].name:
server.setSource(cameras[i])
break
NetworkTableInstance.getDefault().addListener(
NetworkTableInstance.getDefault().getEntry(config.key),
EventFlags.kImmediate | EventFlags.kValueAll,
listener)
return server
if name == “main”:
if len(sys.argv) >= 2:
configFile = sys.argv[1]
# read configuration
if not readConfig():
sys.exit(1)
# start NetworkTables
ntinst = NetworkTableInstance.getDefault()
if server:
print("Setting up NetworkTables server")
ntinst.startServer()
else:
print("Setting up NetworkTables client for team {}".format(team))
ntinst.startClient4("wpilibpi")
ntinst.setServerTeam(team)
ntinst.startDSClient()
# start cameras
# work around wpilibsuite/allwpilib#5055
CameraServer.setSize(CameraServer.kSize160x120)
for config in cameraConfigs:
cameras.append(startCamera(config))
# start switched cameras
for config in switchedCameraConfigs:
startSwitchedCamera(config)
# loop forever
while True:
time.sleep(10)
I have this example code downloaded from wpilib.local (it’s for the raspberry pi), the problem here is I don’t know how to manipulate that camera in OpenCV.