Upsert Session: Handling New Checked Exceptions
Handling TunnelException
in Upsert Session
Problem
In order to support upsert and delete operations using MaxCompute’s UpsertSession
, we need to call the following methods:
void upsert(Record record) throws IOException, TunnelException;
void upsert(Record record, List<String> columns) throws IOException, TunnelException;
void delete(Record record) throws IOException, TunnelException;
However, our current write
method is defined as:
public void write(SeaTunnelRow seaTunnelRow) throws IOException;
Since TunnelException
is a checked exception, we cannot directly throw it without modifying the method signature.
Options
To resolve this, we have several choices:
-
Propagate
TunnelException
by modifying the method signaturepublic void write(SeaTunnelRow row) throws IOException, TunnelException;
- ✅ Clearly communicates what the method may throw
- ❌ Requires changing the interface if it’s defined in a parent interface or abstract class
-
Wrap
TunnelException
in an unchecked exceptioncatch (TunnelException e) { throw new MaxcomputeConnectorException(CommonErrorCode.WRITER_OPERATION_FAILED, e); }
- ✅ Does not require interface modification
- ✅ Allows preserving the cause for debugging
- ❌ Shifts the burden of handling the exception to the runtime
-
Convert
TunnelException
to anIOException
If the semantics align, wrap it into
IOException
:throw new IOException("Failed to write due to TunnelException", e);
- ✅ Interface remains unchanged
- ❌ Might obscure the specific cause (
TunnelException
)
In this situation, I think option #2 is the most appropriate.
Here’s the original code from UpsertStreamImpl
:
private void write(Record record, Operation op, List<String> valueColumns) throws TunnelException, IOException {
this.checkStatus();
List<Integer> hashValues = new ArrayList();
Object value;
TypeInfo typeInfo;
for (Iterator var5 = this.hashKeys.iterator(); var5.hasNext();
hashValues.add(TypeHasher.hash(typeInfo.getOdpsType(), value, this.session.getHasher()))) {
int key = (Integer) var5.next();
value = record.get(key);
if (value == null) {
throw new TunnelException("UpsertRecord must have primary key value. Consider providing values for column '"
+ this.schema.getColumn(key).getName() + "'");
}
typeInfo = this.schema.getColumn(key).getTypeInfo();
if (typeInfo.getOdpsType() == OdpsType.DECIMAL) {
DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) typeInfo;
value = new DecimalHashObject((BigDecimal) value, decimalTypeInfo.getPrecision(), decimalTypeInfo.getScale());
}
}
...
}
The exception thrown here is not recoverable — if a primary key is missing, there’s nothing meaningful the system can do to fix it at runtime.
In this case, it’s reasonable to convert TunnelException
into a runtime exception.
Reference to a Similar Case
I found a similar discussion in this PR: apache/seatunnel#3640
In the PR, a reviewer suggested unifying exceptions for connectors, and the code was changed to:
try {
session.commit();
} catch (Exception e) {
throw new MaxcomputeConnectorException(
CommonErrorCodeDeprecated.WRITER_OPERATION_FAILED, e);
}
Why catch a broader Exception
rather than only TunnelException
?
There are two main reasons:
- Runtime exceptions like
NullPointerException
,IllegalArgumentException
, or other unexpected issues may occur within the method body — even if the method declaresTunnelException
. - If the underlying library is updated in the future and starts throwing a new checked exception, catching
Exception
ensures the code remains compatible and avoids breaking the flow.
This is a form of defensive coding and makes the system more resilient to future changes.
✏️In actual operating environments, changing checked exceptions within the library to runtime exceptions may be a better choice depending on the situation.