Camera OpenCV

import json
import time
import sys
import numpy as np
import robotpy_apriltag
import cv2
from cscore import CameraServer, VideoSource, UsbCamera, MjpegServer
from ntcore import NetworkTableInstance, EventFlags

JSON format:

{

“team”: ,

“ntmode”: <“client” or “server”, “client” if unspecified>

“cameras”: [

{

“name”:

“path”: <path, e.g. “/dev/video0”>

“pixel format”: <“MJPEG”, “YUYV”, etc> // optional

“width”:

“height”:

“fps”:

“brightness”: // optional

“white balance”: <“auto”, “hold”, value> // optional

“exposure”: <“auto”, “hold”, value> // optional

“properties”: [ // optional

{

“name”:

“value”:

}

],

“stream”: { // optional

“properties”: [

{

“name”:

“value”:

}

]

}

}

]

“switched cameras”: [

{

“name”:

“key”:

// if NT value is a string, it’s treated as a name

// if NT value is a double, it’s treated as an integer index

}

]

}

configFile = “/boot/frc.json”

class CameraConfig: pass

team = None
server = False
cameraConfigs =
switchedCameraConfigs =
cameras =

def parseError(str):
“”“Report parse error.”“”
print(“config error in '” + configFile + "': " + str, file=sys.stderr)

def readCameraConfig(config):
“”“Read single camera configuration.”“”
cam = CameraConfig()

# name
try:
    cam.name = config["name"]
except KeyError:
    parseError("could not read camera name")
    return False

# path
try:
    cam.path = config["path"]
except KeyError:
    parseError("camera '{}': could not read path".format(cam.name))
    return False

# stream properties
cam.streamConfig = config.get("stream")

cam.config = config

cameraConfigs.append(cam)
return True

def readSwitchedCameraConfig(config):
“”“Read single switched camera configuration.”“”
cam = CameraConfig()

# name
try:
    cam.name = config["name"]
except KeyError:
    parseError("could not read switched camera name")
    return False

# path
try:
    cam.key = config["key"]
except KeyError:
    parseError("switched camera '{}': could not read key".format(cam.name))
    return False

switchedCameraConfigs.append(cam)
return True

def readConfig():
“”“Read configuration file.”“”
global team
global server

# parse file
try:
    with open(configFile, "rt", encoding="utf-8") as f:
        j = json.load(f)
except OSError as err:
    print("could not open '{}': {}".format(configFile, err), file=sys.stderr)
    return False

# top level must be an object
if not isinstance(j, dict):
    parseError("must be JSON object")
    return False

# team number
try:
    team = j["team"]
except KeyError:
    parseError("could not read team number")
    return False

# ntmode (optional)
if "ntmode" in j:
    str = j["ntmode"]
    if str.lower() == "client":
        server = False
    elif str.lower() == "server":
        server = True
    else:
        parseError("could not understand ntmode value '{}'".format(str))

# cameras
try:
    cameras = j["cameras"]
except KeyError:
    parseError("could not read cameras")
    return False
for camera in cameras:
    if not readCameraConfig(camera):
        return False

# switched cameras
if "switched cameras" in j:
    for camera in j["switched cameras"]:
        if not readSwitchedCameraConfig(camera):
            return False

return True

def startCamera(config):
“”“Start running the camera.”“”
print(“Starting camera ‘{}’ on {}”.format(config.name, config.path))
camera = UsbCamera(config.name, config.path)
server = CameraServer.startAutomaticCapture(camera=camera)

camera.setConfigJson(json.dumps(config.config))
camera.setConnectionStrategy(VideoSource.ConnectionStrategy.kConnectionKeepOpen)

if config.streamConfig is not None:
    server.setConfigJson(json.dumps(config.streamConfig))
    
return camera

def startSwitchedCamera(config):
“”“Start running the switched camera.”“”
print(“Starting switched camera ‘{}’ on {}”.format(config.name, config.key))
server = CameraServer.addSwitchedCamera(config.name)

def listener(event):
    data = event.data
    if data is not None:
        value = data.value.value()
        if isinstance(value, int):
            if value >= 0 and value < len(cameras):
                server.setSource(cameras[value])
        elif isinstance(value, float):
            i = int(value)
            if i >= 0 and i < len(cameras):
                server.setSource(cameras[i])
        elif isinstance(value, str):
            for i in range(len(cameraConfigs)):
                if value == cameraConfigs[i].name:
                    server.setSource(cameras[i])
                    break

NetworkTableInstance.getDefault().addListener(
    NetworkTableInstance.getDefault().getEntry(config.key),
    EventFlags.kImmediate | EventFlags.kValueAll,
    listener)

return server

if name == “main”:
if len(sys.argv) >= 2:
configFile = sys.argv[1]

# read configuration
if not readConfig():
    sys.exit(1)

# start NetworkTables
ntinst = NetworkTableInstance.getDefault()
if server:
    print("Setting up NetworkTables server")
    ntinst.startServer()
else:
    print("Setting up NetworkTables client for team {}".format(team))
    ntinst.startClient4("wpilibpi")
    ntinst.setServerTeam(team)
    ntinst.startDSClient()

# start cameras
# work around wpilibsuite/allwpilib#5055
CameraServer.setSize(CameraServer.kSize160x120)
for config in cameraConfigs:
    cameras.append(startCamera(config))

# start switched cameras
for config in switchedCameraConfigs:
    startSwitchedCamera(config)

# loop forever
while True:
    time.sleep(10)

I have this example code downloaded from wpilib.local (it’s for the raspberry pi), the problem here is I don’t know how to manipulate that camera in OpenCV.

Basic Vision Example — FIRST Robotics Competition documentation has a slightly simpler program, but should show you the necessary concepts.

import json
import time
import sys
import numpy as np
import robotpy_apriltag
import cv2
from cscore import CameraServer, VideoSource, UsbCamera, MjpegServer
from ntcore import NetworkTableInstance, EventFlags

JSON format:

{

“team”: ,

“ntmode”: <“client” or “server”, “client” if unspecified>

“cameras”: [

{

“name”:

“path”: <path, e.g. “/dev/video0”>

“pixel format”: <“MJPEG”, “YUYV”, etc> // optional

“width”: // optional

“height”: // optional

“fps”: // optional

“brightness”: // optional

“white balance”: <“auto”, “hold”, value> // optional

“exposure”: <“auto”, “hold”, value> // optional

“properties”: [ // optional

{

“name”:

“value”:

}

],

“stream”: { // optional

“properties”: [

{

“name”:

“value”:

}

]

}

}

]

“switched cameras”: [

{

“name”:

“key”:

// if NT value is a string, it’s treated as a name

// if NT value is a double, it’s treated as an integer index

}

]

}

configFile = “/boot/frc.json”

class CameraConfig: pass

team = None
server = False
cameraConfigs =
switchedCameraConfigs =
cameras =

def parseError(str):
“”“Report parse error.”“”
print(“config error in ‘” + configFile + "’: " + str, file=sys.stderr)

def readCameraConfig(config):
“”“Read single camera configuration.”“”
cam = CameraConfig()

# name
try:
    cam.name = config["name"]
except KeyError:
    parseError("could not read camera name")
    return False

# path
try:
    cam.path = config["path"]
except KeyError:
    parseError("camera '{}': could not read path".format(cam.name))
    return False

# stream properties
cam.streamConfig = config.get("stream")

cam.config = config

cameraConfigs.append(cam)
return True

def readSwitchedCameraConfig(config):
“”“Read single switched camera configuration.”“”
cam = CameraConfig()

# name
try:
    cam.name = config["name"]
except KeyError:
    parseError("could not read switched camera name")
    return False

# path
try:
    cam.key = config["key"]
except KeyError:
    parseError("switched camera '{}': could not read key".format(cam.name))
    return False

switchedCameraConfigs.append(cam)
return True

def readConfig():
“”“Read configuration file.”“”
global team
global server

# parse file
try:
    with open(configFile, "rt", encoding="utf-8") as f:
        j = json.load(f)
except OSError as err:
    print("could not open '{}': {}".format(configFile, err), file=sys.stderr)
    return False

# top level must be an object
if not isinstance(j, dict):
    parseError("must be JSON object")
    return False

# team number
try:
    team = j["team"]
except KeyError:
    parseError("could not read team number")
    return False

# ntmode (optional)
if "ntmode" in j:
    str = j["ntmode"]
    if str.lower() == "client":
        server = False
    elif str.lower() == "server":
        server = True
    else:
        parseError("could not understand ntmode value '{}'".format(str))

# cameras
try:
    cameras = j["cameras"]
except KeyError:
    parseError("could not read cameras")
    return False
for camera in cameras:
    if not readCameraConfig(camera):
        return False

# switched cameras
if "switched cameras" in j:
    for camera in j["switched cameras"]:
        if not readSwitchedCameraConfig(camera):
            return False

return True

def startCamera(config):
“”“Start running the camera.”“”
print(“Starting camera ‘{}’ on {}”.format(config.name, config.path))
camera = UsbCamera(config.name, config.path)
server = CameraServer.startAutomaticCapture(camera=camera)

camera.setConfigJson(json.dumps(config.config))
camera.setConnectionStrategy(VideoSource.ConnectionStrategy.kConnectionKeepOpen)

if config.streamConfig is not None:
    server.setConfigJson(json.dumps(config.streamConfig))
    
return camera

def startSwitchedCamera(config):
“”“Start running the switched camera.”“”
print(“Starting switched camera ‘{}’ on {}”.format(config.name, config.key))
server = CameraServer.addSwitchedCamera(config.name)

def listener(event):
    data = event.data
    if data is not None:
        value = data.value.value()
        if isinstance(value, int):
            if value >= 0 and value < len(cameras):
                server.setSource(cameras[value])
        elif isinstance(value, float):
            i = int(value)
            if i >= 0 and i < len(cameras):
                server.setSource(cameras[i])
        elif isinstance(value, str):
            for i in range(len(cameraConfigs)):
                if value == cameraConfigs[i].name:
                    server.setSource(cameras[i])
                    break

NetworkTableInstance.getDefault().addListener(
    NetworkTableInstance.getDefault().getEntry(config.key),
    EventFlags.kImmediate | EventFlags.kValueAll,
    listener)

return server

if name == “main”:
if len(sys.argv) >= 2:
configFile = sys.argv[1]

# read configuration
if not readConfig():
    sys.exit(1)

# start NetworkTables
ntinst = NetworkTableInstance.getDefault()
if server:
    print("Setting up NetworkTables server")
    ntinst.startServer()
else:
    print("Setting up NetworkTables client for team {}".format(team))
    ntinst.startClient4("wpilibpi")
    ntinst.setServerTeam(team)
    ntinst.startDSClient()

# start cameras
# work around wpilibsuite/allwpilib#5055
CameraServer.setSize(CameraServer.kSize160x120)
for config in cameraConfigs:
    cameras.append(startCamera(config))

# start switched cameras
for config in switchedCameraConfigs:
    startSwitchedCamera(config)

# loop forever
while True:
    time.sleep(10)

I have this example code downloaded from wpilib.local (it’s for the raspberry pi), the problem here is I don’t know how to manipulate that camera in OpenCV.

Can you describe more about what manipulation you need to perform?

A few other tips to help get answers faster:

  1. When making a post, consider using fenced code blocks to put your code snippets in, it’ll make things much easier for others to read.
  2. Consider using github.com to post full code samples - it will also help you track your code and its changes. When posting in forums, only provide the portions you believe are most relevant to the question. This serves two purposes - it reduces the work others have to do to understand your meaning, and forces you to think through which portions of code are relevant (which goes a long way toward forming a better question)

I want to detect apriltags with robotpy_apriltag, but I have to manipulate the camera in OpenCV, and I have already done it but now I have a problem with this:

apriltag.c:224:quick_decode_init(): Failed to allocate hamming decode table.

This is the code:

What version of robotpy-apriltag do you have installed? Make sure you’ve got 2024.3.1.

I’m having this problem in the raspberry pi, I uploaded this code but I have this error:

apriltag.c:224:quick_decode_init(): Failed to allocate hamming decode table.

This is the code:

I am also having the same error apriltag.c:224:quick_decode_init(): Failed to allocate hamming decode table when using cameras + april tags. I didn’t have this error before I updated to WPILib 2024 (Java).

I, too, am getting this error with Java 2024 WPILib VSCode example program.
Did either @Esdras01 or @fisherjacobc get this working? thanks.

That’s a memory allocation error. You could reduce the bitsCorrected parameter to AprilTagDetector.addFamily to reduce memory required.

2023 example used 0 bits corrected since it was a much smaller tag family.

1 Like

What is the recommended option for that?

With a program slightly larger than the example, on the roboRIO (original - v1) 0 bits and 1 bit worked; 2 and 3 do not. Maybe on a roboRIO v2 or RPi you can go higher.

It appears that for 36h11:
Hamming 1 allocates (587 + 587 * 36) * 3 * 12 = 781KB
Hamming 2 allocates (587 + 587 * 36 + 587 * 36 * 35) * 3 * 12 = 27.4 MB
Hamming 3 allocates (587 + 587 * 36 + 587 * 36 * 35 + 587 * 36 * 35 * 34) * 3 * 12 = 932 MB

(reference: apriltag.c source code)

The Rio 1 only has 256 MB of total memory. The Rio 2 only has 512 MB.

So I think the example is simply broken as it uses hamming 3; while it might work in sim, it will not work on Rio. It should be changed to use hamming 2 on Rio 2 and hamming 1 on Rio 1. I’ve opened an issue for this on allwpilib.

1 Like

This topic was automatically closed 365 days after the last reply. New replies are no longer allowed.