我的应用程序使用JPA / EclipseLink和MySQL作为数据库时遇到了并发问题。我在这里需要一个建议。
我有一个客户端,它在文件中生成字符串块。每个块可以包含几行,最多100或200行。生成数据后,客户端立即开始将一组块发送到服务器,在服务器上相应地解析每个块并将其存储在db中。服务器使用JPA / EclipseLink与MySQL通信。
当客户端以同步方式(通过REST)发送一组块时,就解析数据并将它们存储在db中而言,一切都按预期工作。但是,当客户端并行发送数据时(在我的情况下为15个并发线程),某些数据(几行)仅在第一次存储在db中时被跳过。当再次并行发送相同的数据(第二次,第三次等)时,它仅能按预期工作(在存储过程中不会跳过任何行)。
为了进行并行发送,仅出于测试目的,我将所有行存储在并发映射中。我在地图中没有看到任何跳过的行,因此这纯粹是JPA和/或MySQL问题。我试图在JPA中使用乐观/悲观锁,但它们没有帮助。
您能告诉我,如果有人遇到这种情况怎么办?
这是我的三个相关实体类:
@Entity
@Table(name = "PROJECT")
@NamedQueries({
@NamedQuery(name = "Project.findByKey", query = "SELECT p FROM Project p WHERE p.key = :P_KEY",
hints = {
@QueryHint(name = QueryHints.QUERY_RESULTS_CACHE, value = HintValues.TRUE),
@QueryHint(name = QueryHints.QUERY_RESULTS_CACHE_SIZE, value = "1000"),
@QueryHint(name = QueryHints.QUERY_RESULTS_CACHE_IGNORE_NULL, value = HintValues.TRUE),
@QueryHint(name = QueryHints.BIND_PARAMETERS, value = HintValues.TRUE)
}
)
})
@XmlRootElement
@XmlAccessorType(XmlAccessType.FIELD)
@XmlType
@CascadeOnDelete
@OptimisticLocking
public class Project implements Serializable {
@Id
@TableGenerator(name = "p-table-gen", table = "SEQ", pkColumnName = "SEQ_NAME", valueColumnName = "SEQ_COUNT", pkColumnValue = "PROJECT_SEQ", allocationSize = 50)
@GeneratedValue(strategy = GenerationType.TABLE, generator = "p-table-gen")
private Long id;
@Column(name = "DURATION")
private Long duration;
@Column(name = "PROGRAM")
private String program;
@Embedded
@AttributeOverrides(
{
@AttributeOverride(name = "parentHash", column = @Column(name = "PARENT_CMD_HASH")),
@AttributeOverride(name = "hash", column = @Column(name = "CMD_HASH")),
@AttributeOverride(name = "pathHash", column = @Column(name = "PATH_HASH"))
}
)
ProjectKey key = new ProjectKey();
@BatchFetch(BatchFetchType.EXISTS)
@OneToMany(mappedBy="project", cascade = CascadeType.ALL, fetch = FetchType.LAZY, orphanRemoval = true )
@XmlTransient
@CascadeOnDelete
private List<Task> tasks = new Vector<Task>();
@BatchFetch(BatchFetchType.EXISTS)
@OneToOne(mappedBy="project", cascade = CascadeType.ALL, fetch = FetchType.LAZY, orphanRemoval = true )
@XmlTransient
@CascadeOnDelete
private Environment environment;
@Version
@Column(name = "VERSION")
private Long version;
}
@Entity
@Table(name = "TASK")
@NamedQueries({
@NamedQuery(name = "Task.findTaskByBuildAndPathName",
query = "SELECT t FROM Task t WHERE t.operation = 'W' AND t.key.pathName = :PATH_NAME ORDER BY t.startTime",
hints = {
@QueryHint(name = QueryHints.BIND_PARAMETERS, value = HintValues.TRUE)
}
)
})
@XmlRootElement
@XmlAccessorType(XmlAccessType.FIELD)
@XmlType
@CascadeOnDelete
@OptimisticLocking
public class Task implements Serializable, Constants {
private static final long serialVersionUID = 1L;
@Id
@TableGenerator(name = "t-table-gen", table = "SEQ", pkColumnName = "SEQ_NAME", valueColumnName = "SEQ_COUNT", pkColumnValue = "TASK_SEQ", allocationSize = 200)
@GeneratedValue(strategy = GenerationType.TABLE, generator = "t-table-gen")
private Long id;
@Column(name = "HASH")
private String hash;
@Column(name = "OPERATION")
private String operation;
@Column(name = "PROCESS_ID")
private BigInteger processId;
@Embedded
@AttributeOverride(name = "nanoseconds", column = @Column(name = "START_TIME"))
private Moment startTime;
@Column(name = "THREAD_ID")
private BigInteger threadId;
@Column(name = "THROUGHPUT")
private Float throughput;
@Column(name = "PROCESSED_BYTES")
private Float processedBytes;
@BatchFetch(BatchFetchType.JOIN)
@ManyToOne(cascade = { CascadeType.MERGE, CascadeType.REMOVE}, fetch=FetchType.EAGER)
@JoinColumn(name = "PATH_STATE_ID", insertable = false, updatable = true)
@CascadeOnDelete
private TaskKey key;
@ManyToOne
@JoinColumn(name = "PROJECT_ID")
@XmlTransient
@CascadeOnDelete
private Project project;
@Version
@Column(name = "VERSION")
private Long version;
}
@Entity
@Index(name="IDX_TS_SIZE_TIME_INDEX", columnNames={"SIZE","TIME"})
@Table(name = "TASK_STATE")
@NamedQueries({
@NamedQuery(name = "TaskState.findByKey", query = "SELECT ts FROM TaskState ts WHERE ts.key = :TS_KEY",
hints = {
@QueryHint(name = QueryHints.QUERY_RESULTS_CACHE, value = HintValues.TRUE),
@QueryHint(name = QueryHints.QUERY_RESULTS_CACHE_SIZE, value = "2000"),
@QueryHint(name = QueryHints.QUERY_RESULTS_CACHE_IGNORE_NULL, value="true"),
@QueryHint(name = QueryHints.BIND_PARAMETERS, value = HintValues.TRUE)
}
)
})
@XmlRootElement
@XmlAccessorType(XmlAccessType.FIELD)
@XmlType
public class TaskState implements Serializable {
@Id
@TableGenerator(name = "ts-table-gen", table = "SEQ", pkColumnName = "SEQ_NAME", valueColumnName = "SEQ_COUNT", pkColumnValue = "TASK_STATE_SEQ", allocationSize = 50)
@GeneratedValue(strategy = GenerationType.TABLE, generator = "ts-table-gen")
private Long id;
@Lob
@Basic(fetch = FetchType.LAZY)
private byte[] artifact;
@Column(name = "MODE")
private String mode;
@Embedded
@AttributeOverrides({
@AttributeOverride(name = "nanoseconds", column = @Column(name = "TIME")),
@AttributeOverride(name = "size", column = @Column(name = "SIZE")),
@AttributeOverride(name = "pathName", column = @Column(name = "NAME")),
@AttributeOverride(name = "fileHash", column = @Column(name = "HASH"))
})
TaskStateKey key = new TaskStateKey();
@BatchFetch(BatchFetchType.EXISTS)
@OneToMany(mappedBy = "taskState", orphanRemoval = true, cascade = { CascadeType.REMOVE})
@XmlTransient
@CascadeOnDelete
private List<Task> tasks = new Vector<Task>();
}
更新资料
现在我想我找到了根本问题,但是还没有找到解决方案。
最初,我在数据库上创建一个项目,然后尝试查找相关的taskState。最初没有taskState,所以我一个一个地创建。因此,这是关于taskState的操作的结果:
1.尝试通过键查找taskState,如果有,则返回它,否则返回
2.持久化/插入新的taskState(当有多个线程尝试执行插入操作时,我收到重复的输入错误,因此我将重试插入操作直到成功。)并返回新创建的pathState。
每当我在第二次或第三次成功保存pathState时,在pathState的当前插入操作中收到重复的输入错误时,我都会看到跳过的行。我可以在数据库上启用SERIALIZABLE隔离级别,但是这在我的情况下是无法承受的,并且我怀疑SERIALIZABLE隔离是否可以解决我的问题。
最佳答案
将TaskState的ID生成策略从TABLE更改为IDENTITY;这将允许MySQL自行设置插入行的ID。