Issue Details (XML | Word | Printable)

Key: NUCRDBMS-400
Type: New Feature New Feature
Status: Open Open
Priority: Minor Minor
Assignee: Unassigned
Reporter: Shane Whitehead
Votes: 1
Watchers: 2
Operations

If you were logged in you would be able to see more operations.
DataNucleus Store RDBMS

A possible implementation of a Streamable Object to gain access to a InputStream to and from a Database

Created: 09/Oct/06 10:02 PM   Updated: 26/May/10 01:52 PM
Component/s: Types
Affects Version/s: None
Fix Version/s: None

File Attachments: 1. Zip Archive jpox-jta-sample.zip (22 kB)
2. Java Archive File omjjta.jar (16 kB)
3. XML File plugin.xml (0.5 kB)
4. Java Source File StreamableMapping.java (4 kB)
5. Java Source File StreamableObject.java (6 kB)
6. Java Source File StreamableRDMSMapping.java (5 kB)

Environment:
Developed against WIndows XP, Java 1.5.0_06, Postgres 8.x.

Was developed against Postgres's "bytea" byte array column type. No testing against Blobs

Datastore: PostgreSQL


 Description  « Hide
Provides a possible implementation for accessing an InputStream from the database and providing a means to write an InputStream to the database.

Includes a StreamableObject to be used against fields, StreamableMapping class for Java Type mapping and StreamableRDBMSMapping class for RDMS mapping.

The intention is to provide a means where large volume columns can be read at the leisure of the system without needing to first download the full stream or provide secondary classes to handle the manipulation (it can be do all from within a single class).

The usage is relativly simple. Once the object has been retrieved from the database, simply retrieve the reference to the StreamableObject and use the "getBinaryStream" method to gain access to the input stream.

To update the value, create a new StreamableObject passing in the new input stream and the streams length, set the appropriate value on the target object and persist.

This implementation is provided as proof of concept. Class, method and variable naming may be changed to better suit the requirements of the API and system. Modification can be made if required.

Feedback and suggestions are sort to improve the logic and implementation.

Mapping is as per usual and is very simple:
<plugin>
    <extension point="org.jpox.store_mapping">
        <mapping java-type="au.org.kaizen.jdo.StreamableObject"
            mapping-class="au.org.kaizen.jdo.StreamableMapping"/>
    </extension>

    <extension point="org.jpox.store_datastoremapping">
        <mapping java-type="au.org.kaizen.jdo.StreamableObject"
            rdbms-mapping-class="au.org.kaizen.jdo.StreamableRDBMSMapping"
            jdbc-type="LONGVARBINARY"
            sql-type="LONGVARBINARY"
            default="true" />
    </extension>
</plugin>

<jdo>
    <package name="...">
        
        <!-- Point to a class who contains a field of StreamableObject -->
        <class name="..." identity-type="..." table="...">
            <field name="..." persistence-modifier="persistent">
                <column name="content" jdbc-type="LONGVARBINARY"/>
            </field>
        </class>

    </package>
</jdo>
--------------------------------------------------------------------------------
/*
 * StreamableObject.java
 *
 * Created on 7 October 2006, 17:44
 *
 * To change this template, choose Tools | Template Manager
 * and open the template in the editor.
 */

package au.org.kaizen.jdo;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

/**
 * This class represents a streamable source of binary data that can either be
 * read from or written to a datasource.
 *
 * In short a streamable object is an object whose source of information is
 * contained within a InputStream either to be read from or written to the
 * datasource.
 *
 * It should be noted that once the class is initalised the classes stream can
 * not be reassigned. This is design choice as I didn't have enough knowledge
 * about the JDO to determine if it was possible to mark this class as been
 * dirty if it was updated in this fashion.
 *
 * It may possible to provide a wrapper to this object that can be used to
 * monitor it's state but is beyond the scope of this current implementation.
 *
 * If you wish to change the contents of a column in the database, you need to
 * replace the field representing the binary column with a new instance with the new
 * stream
 *
 * It was decided to avoid using the InputStream.avaliable call when trying to
 * determine the length of stream as it can't be relied on to always return an
 * accurate measure of the stream.
 *
 * It might seem fesiable simply to map directly to the InputStream, the JDBC
 * PreparedStatement needs to know the length of the stream in order to write
 * information to the datastore. As mentioned above, it may not be possible to
 * determine an accurate length of the binary stream simply by calling
 * InputStream.avalaibe. Ultimatly, this class over comes that issue.
 *
 * @author swhitehead
 */
public class StreamableObject
{
    
    private InputStream stream;
    
    /**
     * Determines the number of bytes that are to be written to the database
     */
    private long writeLength;
    
    /**
     * Creates a new instance of StreamableObject.
     *
     * This method is called by the mapping class when it retrieves the input stream
     * from the datasource.
     *
     * You are discouraged from calling this method if you wish to wtite the stream to
     * the datastore as the "getStreamLength" method will return 0 and no information
     * will be written.
     * @param stream Datasource input stream representing the binary information
     */
    public StreamableObject(InputStream stream)
    {
        setBinaryStream(stream);
    }

    /**
     * Creates a new instance of StreamableObject.
     *
     * This method should be used when creating a new binary stream. This constructor
     * requires the stream length to be passed in as a requirement for the JDBC API
     * setBinaryStream call.
     * @param stream The new binary stream to be written to the datastore
     * @param writeLength The number of bytes that this stream conatins (or will contain)
     */
    public StreamableObject(InputStream stream, long writeLength)
    {
        setBinaryStream(stream);
        setStreamLength(writeLength);
    }
    
    /**
     * Method for setting the binary stream.
     * @param stream The binary stream represented by this class
     */
    protected void setBinaryStream(InputStream stream)
    {
        this.stream = stream;
    }
    
    /**
     * Returns the binary stream been managed by this class
     * @return The binary stream
     */
    public InputStream getBinaryStream()
    {
        return stream;
    }

    /**
     * Sets the stream's length which is reported to the JDBC.setBinaryStream method
     * when the data is written to the datastore
     * @param length The binary length of the stream
     */
    public void setStreamLength(long length)
    {
        writeLength = length;
    }
    
    /**
     * Returns the number of bytes that are to be written by the write stream
     * @return Returns the length of the binary stream
     */
    public long getStreamLength()
    {
        return writeLength;
    }

    /**
     * String representation of the class
     * @return A string representation of the class
     */
    public String toString()
    {
        StringBuffer sb = new StringBuffer("StreamableObject: ");
        
        if (stream == null)
        {
            sb.append("stream = null");
        }
        else
        {
            try
            {
                sb.append("stream.avaliable = ").append(stream.available());
            }
            catch (IOException ex)
            {
                sb.append("stream unable to determine avaliable due to error - ").append(ex.getMessage());
            }
        }
        
        return sb.toString();
    }
    
    /**
     * Convient method for dumping the binary stream to the supplied output stream
     * @param os The OutputStream to which the binary stream is to written to
     * @return Returns the number of bytes written
     * @throws java.io.IOException Throw if an IO error occurs during the write process
     */
    public long writeToStream(OutputStream os) throws IOException
    {
        byte[] buffer = new byte[1024];
        InputStream is = getBinaryStream();
        int bytesRead = -1;
        int totalBytes = 0;
        while ((bytesRead = is.read(buffer)) > -1)
        {
            os.write(buffer, 0, bytesRead);
            totalBytes += bytesRead;
        }
        
        return totalBytes;
    }

}
--------------------------------------------------------------------------------
/*
 * StreamableMapping.java
 *
 * Created on 7 October 2006, 16:58
 *
 * To change this template, choose Tools | Template Manager
 * and open the template in the editor.
 */

package au.org.kaizen.jdo;

import javax.jdo.JDOFatalException;
import org.apache.log4j.Logger;
import org.jpox.ClassLoaderResolver;
import org.jpox.metadata.AbstractPropertyMetaData;
import org.jpox.store.DatastoreAdapter;
import org.jpox.store.DatastoreContainerObject;
import org.jpox.store.expression.LogicSetExpression;
import org.jpox.store.expression.QueryExpression;
import org.jpox.store.expression.ScalarExpression;
import org.jpox.store.mapping.SingleFieldMapping;

/**
 * Represents the class mapping for the StreamableObjet class
 * @author swhitehead
 */
public class StreamableMapping extends SingleFieldMapping
{
    
    private Logger logger;

    /**
     * Constructor for a mapping used by a Query.
     * @param dba Datastore Adapter
     * @param type Type of the field
     */
    public StreamableMapping(DatastoreAdapter dba, String type)
    {
        super(dba, type);
    }

    /**
     * Constructor for a mapping used by a datastore representation.
     * @param dba Datastore Adapter
     * @param fmd AbstractPropertyMetaData for the field being mapped
     * @param datastoreContainer Table containing the mapped object
     */
    public StreamableMapping(DatastoreAdapter dba, AbstractPropertyMetaData fmd, DatastoreContainerObject datastoreContainer)
    {
        super(dba, fmd, datastoreContainer);
    }

    /**
     * Constructor for a mapping used by a datastore representation.
     * @param clr Classloader resolver
     * @param dba Datastore Adapter
     * @param fmd AbstractPropertyMetaData for the field being mapped
     * @param datastoreContainer Table containing the mapped object
     */
    public StreamableMapping(DatastoreAdapter dba, AbstractPropertyMetaData fmd, DatastoreContainerObject datastoreContainer, ClassLoaderResolver clr)
    {
        super(dba, fmd, datastoreContainer);
    }
    
    /**
     * Convient method for obtaining the logger for this class
     * @return A logger suitable for logging
     */
    protected Logger getLogger()
    {
        if (logger == null)
        {
            logger = Logger.getLogger(StreamableMapping.class);
        }
        
        return logger;
    }

    /**
     * Returns the Java class type represented by this mapping
     * @return Returns StreamableObject.class
     */
    public Class getJavaType()
    {
        return StreamableObject.class;
    }

    /**
     * Returns a sample StreamableObject
     * @param classLoaderResolver ClassLoaderResolver
     * @return A new instance of StreamableObject
     */
    public Object getSampleValue(ClassLoaderResolver classLoaderResolver)
    {
        return new StreamableObject(null);
    }

    /**
     * Overwritten to throw an error. It is not possible to include a streamable
     * object in a query
     * @param qExpr QueryExpression
     * @param literal Object literal
     * @return Returns nothing - throws Fatal Exception
     */
    public ScalarExpression newLiteral(QueryExpression qExpr, Object literal)
    {
        throw new JDOFatalException(getJavaType().getName()+" is not supported in queries.");
    }

    /**
     * Overwritten to throw an error. It is not possible to include a streamable
     * object in a query
     * @param qExpr QueryExpression
     * @param expr LogicalSetExpression
     * @return Returns nothing - throws Fatal Exception
     */
    public ScalarExpression newScalarExpression(QueryExpression qExpr, LogicSetExpression expr)
    {
        throw new JDOFatalException(getJavaType().getName()+" is not supported in queries.");
    }
    
}
--------------------------------------------------------------------------------
/*
 * StreamableRDBMSMapping.java
 *
 * Created on 8 October 2006, 07:50
 *
 * To change this template, choose Tools | Template Manager
 * and open the template in the editor.
 */

package au.org.kaizen.jdo;

import java.io.InputStream;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import javax.jdo.JDODataStoreException;
import org.apache.log4j.Logger;
import org.jpox.store.DatastoreField;
import org.jpox.store.StoreManager;
import org.jpox.store.mapping.JavaTypeMapping;
import org.jpox.store.rdbms.mapping.LongVarcharRDBMSMapping;
import org.jpox.store.rdbms.typeinfo.TypeInfo;

/**
 * Represents the physcal mapping of the StreamableObject to the JDBC infoamtion
 * @author swhitehead
 */
public class StreamableRDBMSMapping extends LongVarcharRDBMSMapping {

    private Logger logger;
    
    /**
     * Constructor.
     * @param storeMgr Manager for the store
     * @param mapping The java type mapping for the field
     */
    public StreamableRDBMSMapping(StoreManager storeMgr, JavaTypeMapping mapping)
    {
        super(storeMgr, mapping);
    }

    /**
     * Constructor.
     * @param mapping The java type mapping for the field.
     * @param storeMgr Manager for the store
     * @param field The field
     */
    public StreamableRDBMSMapping(JavaTypeMapping mapping, StoreManager storeMgr, DatastoreField field)
    {
        super(mapping, storeMgr, field);
    }
    
    /**
     * Convient method for getting a suitable Logger for this class
     * @return A suitable logger reference
     */
    protected Logger getLogger()
    {
        if (logger == null)
        {
            logger = Logger.getLogger(StreamableRDBMSMapping.class);
        }
        
        return logger;
    }

    /**
     * Accessor for the type info for this datastore field
     * @return Type info for the datastore field
     */
    public TypeInfo getTypeInfo()
    {
        return getDatabaseAdapter().getTypeInfo(Types.LONGVARBINARY);
    }

    /**
     * Sets the value of the JDBC call to match the object.
     *
     * This maps the StreamableObject's stream (getBinaryStream) to the JDBC's
     * setBinaryStream method. As required by this method, it uses the
     * StreamableObject's stream length to tell the JDBC driver the number of bytes
     * expected by this stream.
     *
     * It is possible to pass a null value to this method.
     * @param ps The PreparedStatement been used
     * @param param The param index point
     * @param value The StreamableObjet been updated
     */
    public void setObject(Object ps, int param, Object value)
    {
        getLogger().debug("SetObject..." + value.getClass());
        try
        {
            if (value == null)
            {
                getLogger().debug("Value is null...");
                ((PreparedStatement) ps).setNull(param, getTypeInfo().dataType);
            }
            else if (value instanceof StreamableObject)
            {
                StreamableObject so = (StreamableObject)value;
                getLogger().debug("Stream length = " + so.getStreamLength());
                ((PreparedStatement) ps).setBinaryStream(param, so.getBinaryStream(), (int)so.getStreamLength());
            }
        }
        catch (SQLException e)
        {
            throw new JDODataStoreException(LOCALISER.msg("RDBMS.Mapping.UnableToSetParam","Object", "" + value, column, e.getMessage()), e);
        }
    }

    /**
     * This method retrieves the BinaryStream InputStream reference from the JDBC
     * datastore and wraps it in a StreamableObject.
     *
     * This method simply calls the JDBC ResultSet.getBinaryStream method to retrieve
     * the reference to the datastore InputStream
     * @param rs The result set from the datastore
     * @param param The column index to find the stream.
     * @return Returns a StreamableObject representing the InputStream of the binary stream
     */
    public Object getObject(Object rs, int param)
    {
        getLogger().debug("getObject...");
        StreamableObject so = null;

        try
        {
            getLogger().debug("Get stream from resultset...");
            InputStream is = ((ResultSet) rs).getBinaryStream(param);
            if( !((ResultSet) rs).wasNull() )
            {
                getLogger().debug("Create new streamable object...");
                so = new StreamableObject(is);
            }
        }
        catch (SQLException e)
        {
            throw new JDODataStoreException(LOCALISER.msg("RDBMS.Mapping.UnableToGetParam","Object", "" + param, column, e.getMessage()), e);
        }

        getLogger().debug("Return " + so + "...");
        return so;
    }
}

Sort Order: Ascending order - Click to sort in descending order
Shane Whitehead added a comment - 09/Oct/06 10:05 PM
Sample plugin

Shane Whitehead added a comment - 09/Oct/06 10:06 PM
Main wrapper class for the InputStream.

This is designed to meet the requirements of the JDBC.PreparedStatement.setBinaryStream method

Shane Whitehead added a comment - 09/Oct/06 10:08 PM
A wrapper for the InputStream

This was designed to meet the needs of the JDBC.PreparedStatement.setBinaryStream method

Shane Whitehead added a comment - 09/Oct/06 10:09 PM
The RDMS mapping for the StreamableObject class...this is the work horse class

Guido Anzuoni added a comment - 12/Oct/06 07:22 AM
I see a potential danger in holding the InputStream in StreamableObject instead of a
"consume" InputStream approach.
What happens if this stream is not read and closed and the ResultSet/Statement/Connection is closed ?


Shane Whitehead added a comment - 12/Oct/06 12:18 PM
The comment by Guido is a valid one, having double checked the Java API docs, the getBinaryMethod states:

"Note: All the data in the returned stream must be read prior to getting the value of any other column. The next call to a getter method implicitly closes the stream. Also, a stream may return 0 when the method InputStream.available is called, whether there is data available or not."

This would lead me to believe that the result set may be handling the management of the stream.

This intention of the implementation was to provide an alternative method of access to large amounts of data from the database without the need to load it directly to memory and also to help deal with values that exceed Interger.MAX. It was not intended to be used in within multiple record resultset.

The actual implementation I designed it for actually had two objects, one representing the parameters of a "document", name, date etc and a second one which represented the actual physical content. While all the information was stored in the same table, I decided to split the two elements in this manner so that you were forced to access the document content directly using the "key" value of the properties object.

pm.getObjectById(DocumentContent.class, docProperties.key);

The implementation a user choosers will depended on their requirements, but they should be made aware of these short commings using this approach. It is intended for columns contain megabyte+ data

Guido Anzuoni added a comment - 12/Oct/06 01:21 PM
If I understood well the semantic of the flags of the fields it should be possible to instruct the StateManager that
certain member should always be fetched for the data store. This means that every time I access a member XX
that is mapped as BLOB a SQL statement is triggered to read the content.
Unfortunately, I don't think that this behaviour would help.
In fact, that state manager is triggered by member access but, when the InputStream extracted from the result set
arrives to the application the state manager should have already closed the connection.
The only solution I see is the one that consumes the InputStream **before** the StateManager read trigger
returns.
I already pointed out a possible solution trail in this post.
http://www.jpox.org/servlet/forum/viewthread?thread=3358
Mixing the two approaches, the proposed StreamableObject should spool (somewhere) the InputStream received
in the constructor.
Where is the problem now ?
Spool cleanup (and obviously, sufficient space).
I don't think there is the possibility to have an EvictionListner to receive eviction events.
Otherwise we must rely on finalize().



Guido Anzuoni added a comment - 12/Oct/06 01:31 PM
Just an integration to previous comment.
It would be better not being forced to use a StreamableObject.
If the StreamableRDBMSMapping could use StreamableProvider declared as

interface StreamableFieldProvider {
public long getFieldSize(Object field);
public java.io.InputStream getFieldStream(Object field);

public Object unmarshal(java.io.InputStream is, long size);
}

you could have a File member and a Provider like

class FileStreamableFieldProvider implements StreamableFieldProvider {
       public long getFieldSize(Object field) {
              File f = (File) field;
              return (f != null ) ? f.length(): 0;
       }

       public java.io.InputStream getFieldStream(Object field) {
              File f = (File) field;
              return (f != null ) ? new FileInputStream(f): null;
       }

       public Object unmarshal(java.io.InputStream is, long size) {
              File f = createSpoolFile();
              //copy is to File f
              return f;
       }
}


Guido Anzuoni added a comment - 01/Feb/07 03:18 PM
In the archive there is the implementation of the mapping.
There is also a new main (MainJpoxJta.java) to run the sample in a JTA env.
To run this test you need:
shiftone for jndi implementation
jotm for jta implementation
omjjta.jar attached to the issue for XA wrapping of normal DataSource

Andy Jefferson added a comment - 26/Jul/09 10:41 AM
No point in having something assigned when obviously not working on it